clickhouse-cargo

Accumulates insert queries and commit them to clickhouse in batch jobs with retries on failure. 这个模块将向 Clickhouse 数据库的高频写入改为低频的批量插入。


Keywords
clickhouse, performance, database, db, nodejs
License
ISC
Install
npm install clickhouse-cargo@3.1.1

Documentation

clickhouse-cargo

clickhouse-cargo accumulates insert queries and commit them to clickhouse in batch jobs with retries on failure.

Clickhouse is designed for batch data insertion with significant intervals. When inserting data to clickhouse from distributed node clusters, some sort of centralized queue mechanism is required in order to prevent Clickhouse from hitting the performance limit. That brings complexity to the service chain and more difficulty to maintenance.

Clickhouse-cargo provides an easy way to batch data insertion to Clickhouse. It accumulates insert queries to local file caches, and commit them to Clickhouse in batch. Clickhouse-cargo will also automatically restore uncommitted local caches. That efficiently prevents data lose from unexpected Clickhouse accidents.

clickhouse-cargo 适用于分布式的 NodeJS 服务向 Clickhouse 频繁插入数据的应用场景。这个模块将向 Clickhouse 数据库的高频写入改为低频的批量插入。

How it works

  1. A cargo instance accepts insert requests submitted by the push method and keep inserts in-memory.
  2. The cargo instance periodically flushs in-memory inserts to a file cache, then rotates this file and commits rotations to the Clickhouse server.
  3. In case of a Clickhouse commit failure, the cargo will retry the submission in the next round of its routine till the submission is successful.
  4. In case of the NodeJS process crash. in-memory inserts will be flushed immediately into the file cache.

Cluster mode support

When running in cluster mode (such as PM2 cluster deployment ), all cargo workers will run through an election via udp communication @ 127.0.0.1:17888 to elect a leader worker. Then only the leader worker will carry on with file rotations and commitments.

工作原理

  1. cargo 实例接受 push方法所提交的插入请求,并将请求临时存放于内存中。
  2. cargo 周期性地将内存中累积的插入记录写入对应的文件缓存。随后将文件缓存进行滚动,并将滚出结果提交到 Clickhouse 数据库。
  3. 当向 Clickhouse 写入失败时,cargo 将会在下一轮检查周期中重试提交直到提交成功。
  4. 当本地的 NodeJS 进程奔溃时,内存中累积的插入请求会被同步写入对应的文件缓存。

支持集群模式

在集群模式下,所有的 cargo woker 将通过UDP通讯选举出一个领头的worker。 接着由这个领头的worker来负责文件缓存的滚动和提交到 Clickhouse 数据库。

NodeJS Version requirements / NodeJS 版本要求

NodeJS Version: 18.x and above

Install / 安装

$ npm install clickhouse-cargo

Usage / 使用

/*
sample table schema
`CREATE TABLE IF NOT EXISTS cargo_test.table_test
(
  \`time\` DateTime ,
  \`step\`  UInt32,
  \`pos_id\` String DEFAULT ''
)
ENGINE = Memory()`;
*/

const clickhouse-cargo = require("clickhouse-cargo");
const TABLE_NAME = `default.table_test`;
const NUM_OF_INSERTIONS = 27891; // NOTE: bulk flushs every 100 lines

// init clickhouse-cargo
clickhouse-cargo.init({
  "url":"https://play-api.clickhouse.tech:8443",
  "user":"playground",
  "password":"clickhouse"
});

// insert data
const theCargo = clickhouse-cargo.createCargo(TABLE_NAME);
for (let i =0, i < NUM_OF_INSERTIONS, i++){
  theCargo.push(new Date(), i, "string");
}

Usage examples / 使用示例

This cargo module is designed for inserting large number of records in a few batches. Thus it will helpful to have some sort of insertion generator/validation.

I'd used Joi for a while, and found it consumed too much cpu power, and here is an example of how we are currently using the cargo module

API / 接口

Initialization / 初始化

Init by code

clickhouse-cargo.init(options: Options)

Options

required default description
@clickhouse/client Configurations https://clickhouse.com/docs/en/integrations/javascript#configuration
cargoPath ${cwd()}/cargo_files Path to local cargo cache.
maxTime 1000 For how long in milliseconds, a cargo will keep in-memory insert buffer before flushing it to file.
maxRows 100 For how many rows a cargo will keep in-memory.
commitInterval 5000 Interval(ms) for cargo to commit to ClickHouse.
maxInsetParts 100 For how many parts will be inserted into ClickHouse in a single exame routine. Keep value less then 300 to avoide Too many parts issue happend on clickhouse server-side
saveWhenCrash true When false, cargos will not flushSync in-memory data when node process crashes.

Init by the environment variable

Init by the environment variable is recommended for real-world production. Clickhouse-cargo recognises process.env.CLICKHOUSE_CARGO_PROFILE and seeks the config json file from ~/.clickhouse-cargo/${process.env.CLICKHOUSE_CARGO_PROFILE}

Create a Cargo instance / 创建一个 Cargo 实例

/*
* @param tableName String, the name of ClickHouse table which data is inserted
*/
const cargo = clickhouse-cargo.createCargo(tableName);

Insert a row / 插入一行

/*
Instead of inserting to Clickhouse directly, push row to the cargo. And the cargo will commit accumulated insertions to Clickhouse in batch.
*/
cargo.push(column0, column1, column2, column3...)