Data2es数据导入工具
本项目是一个用来把数据从mysq导入到es中的小工具。程序使用pip 安装,程序作为后台程序执行,可以脱离当前命令行环境。启动时需要指定配置文件。程序的日志文件默认为 /tmp/data2es.log
此文件也可以在启动时指定。
命令行执行方法如下
usage: data2esd {start|stop|restart|kill} [-c]
positional arguments:
{start|stop|restart|kill}
This control command can be used with start or stop or
restart or kill. Used to control the running of
data2esd daemons
optional arguments:
-h, --help show this help message and exit
-c CONFIGFILE_PATH, --config CONFIGFILE_PATH
Specify configuration file path.
-l LOG_FILE_PATH, --logfile LOG_FILE_PATH
Specify log file path.
-v, --version show program's version number and exit
安装
pip install data2es
启动
data2esd start -c 'config.conf'
其中config是必须指定的
停止
data2esd stop
重启
data2esd restart
配置文件范例
以下配置文件所有配置项都要有,值可以为空。
input:{
mysql:{
provider = "mysql" #数据源类型
db_host = "127.0.0.1"
db_port=3110
db_user="root"
db_password="root"
db_name="db_test"
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run = true
#追踪的字段
tracking_column = "id"
#上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path = "/tmp/base_parameter.txt"
#是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
#clean_run = false
#是否将 column 名称转小写
lowercase_column_names = false
# sql 语句文件,分页和增量跟新都写在sql语句中
statement = "select *from table where `id` > {last_run_id} order by `id` limit 5000"
trigger = "schedule" #定时器触发
#trigger = "webhook" #webhook触发
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule = "* * * * * *"
#设置webhook的访问令牌,为空时采用默认的令牌 ”amituofo@xfjl“
webhook_token=""
#设置webhook的访问令牌,为0时采用默认的端口号9899
webhook_port=0
#指定对应的输出
output_name="elasticsearch"
}
}
output:{ #elasticsearch服务器和index的配置
elasticsearch:{
hosts="127.0.0.1:9200"
index="test-index1"
document_id = "id"
username="elastic"
password="elastic"
}
}
webhook:{#有两种事件,一个是开始执行一次任务,一个是任务完成,如果指定了url那么在事件发生时会通知到这两个地址
start=""
finished=""
error=""
}
执行触发
提供两种触发方式,一种是定时任务,一种是webhook触发。定时任务触发采用类似corn的方式设置。webhook的方式下,系统会运行一个http服务器,监听指定的端口,调用地址为 http://<ipaddress>:port/hook?token=''
。
webhook
如果在配置文件中配置了webhook地址,那个在事件发生时会通知设置的地址。通知内容如下(兼容钉钉和企业微信群机器人):
start
{
"msgtype": "text",
"text": {
"content": "0000-00-00 00:00:00 从第 xxxxx 开始同步数据。"
}
}
finished
{
"msgtype": "text",
"text": {
"content": "0000-00-00 00:00:00 完成数据从 xxxx 到 xxxx 的同步。"
}
}