A tool to import data from mysql into elasticsearch


Keywords
data2esb, data2esd, mysql, elasticsearch, import, data
License
MIT
Install
pip install data2es==0.0.14

Documentation

Data2es数据导入工具

本项目是一个用来把数据从mysq导入到es中的小工具。程序使用pip 安装,程序作为后台程序执行,可以脱离当前命令行环境。启动时需要指定配置文件。程序的日志文件默认为 /tmp/data2es.log 此文件也可以在启动时指定。

命令行执行方法如下

usage: data2esd {start|stop|restart|kill} [-c]

positional arguments:
  {start|stop|restart|kill}
                        This control command can be used with start or stop or
                        restart or kill. Used to control the running of
                        data2esd daemons

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIGFILE_PATH, --config CONFIGFILE_PATH
                        Specify configuration file path.
  -l LOG_FILE_PATH, --logfile LOG_FILE_PATH
                        Specify log file path.
  -v, --version         show program's version number and exit

安装

pip install data2es

启动

data2esd start -c 'config.conf'

其中config是必须指定的

停止

data2esd stop

重启

data2esd restart

配置文件范例

以下配置文件所有配置项都要有,值可以为空。

input:{
    mysql:{
        provider = "mysql" #数据源类型        
        db_host = "127.0.0.1"
        db_port=3110
        db_user="root"
        db_password="root"
        db_name="db_test"
        #是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run = true
        #追踪的字段
        tracking_column = "id"        
        #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
        last_run_metadata_path = "/tmp/base_parameter.txt"
        #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        #clean_run = false
        #是否将 column 名称转小写
        lowercase_column_names = false
        # sql 语句文件,分页和增量跟新都写在sql语句中
        statement = "select *from table where `id` > {last_run_id} order by `id` limit 5000"
        trigger = "schedule" #定时器触发
        #trigger = "webhook" #webhook触发
        # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
        schedule = "* * * * * *"
        #设置webhook的访问令牌,为空时采用默认的令牌 ”amituofo@xfjl“
        webhook_token=""
        #设置webhook的访问令牌,为0时采用默认的端口号9899
        webhook_port=0        
        #指定对应的输出
        output_name="elasticsearch"
    }   
}

output:{ #elasticsearch服务器和index的配置
      elasticsearch:{
        hosts="127.0.0.1:9200"        
        index="test-index1"
        document_id = "id"
        username="elastic"
        password="elastic"
      }       
    }
webhook:{#有两种事件,一个是开始执行一次任务,一个是任务完成,如果指定了url那么在事件发生时会通知到这两个地址
  start=""
  finished=""
  error=""
}

执行触发

提供两种触发方式,一种是定时任务,一种是webhook触发。定时任务触发采用类似corn的方式设置。webhook的方式下,系统会运行一个http服务器,监听指定的端口,调用地址为 http://<ipaddress>:port/hook?token=''

webhook

如果在配置文件中配置了webhook地址,那个在事件发生时会通知设置的地址。通知内容如下(兼容钉钉和企业微信群机器人):

start

{
   "msgtype": "text",
   "text": {
        "content": "0000-00-00 00:00:00 从第 xxxxx 开始同步数据。"
    }
}

finished

{
   "msgtype": "text",
   "text": {
        "content": "0000-00-00 00:00:00 完成数据从 xxxx 到 xxxx 的同步。"
    }
}