rdkafka-streams

An alternative streams implementation for KafkaConsumer and Producer from https://github.com/Blizzard/node-rdkafka


Install
npm install rdkafka-streams@1.2.0

Documentation

rdkafka-streams

Usage

Consumer

const { Consumer } = require('rdkafka-streams')

const cs = new Consumer({
  host: 'localhost:9092',
  topic: 'test',
  groupId: 'test'
})

const ws = new Writable({
  objectMode: true,
  write: (obj, enc, cb) => {
    if (++count === total) {
      return cs.destroy()
    }
    setTimeout(cb, 1)
  }

cs.pipe(ws)

Producer

const { Producer } = require('rdkafka-streams')

const ps = new Producer({
  host: 'localhost:9092',
  topic: 'test'
})

ps.write({ some: 'data' })

Duplex

const { getDuplex } = require('rdkafka-streams')

ts = new Transform({
  objectMode: true,
  transform: (obj, enc, cb) => {
    if (obj.value.source !== "origin") {
      return cb()
    }

    if (++count === 1000) {
      eb.destroy()
    }

    setTimeout(() => {
      cb(null, _.extend({}, obj.value, { source: "transform" })
    }, 5)
  }
})

ds = getDuplex({
  host: 'localhost:9092',
  topic: 'test',
  groupId: 'test'
})

ds.pipe(ts).pipe(ds)