@micheleangioni/node-messagebrokers

High level library which abstracts low level Message Brokers clients


Keywords
TypeScript, Message Brokers, Kafka, ddd-architecture, domain-driven-design, message-broker, nodejs, sns
License
MIT
Install
npm install @micheleangioni/node-messagebrokers@0.5.0

Documentation

Node MessageBrokers

GitHub tag (latest by date) Build Status MIT license

High level library which abstracts low level Message Brokers clients and enforces the Cloudevents standard for the event payload.

Library still in Beta version and in development

Contents

Supported Clients

Node MessageBrokers provides adapters for Apache Kafka and AWS SNS.

Current supported clients are KafkaJs (default) for Apache Kafka and the official AWS Node Client for AWS SNS.

Requirements

  • Node.js v14+

Installation

To install Node MessageBrokers, run npm install --save @micheleangioni/node-messagebrokers.

Configuration

Node MessageBrokers can be configured by making use of the following environment variables:

  • UNDERLYING_CLIENT: low level client to use, supported valued:

    • kafkajs (default): modern Apache Kafka client
    • awssns: default AWS SNS client
  • KAFKA_URI: comma-separated list of Kafka brokers, default localhost:9092

  • KAFKA_CLIENT_ID: client id for the Kafka connection. If not set, a random generated will be used

  • KAFKA_LOG_LEVEL: set the logging level of the KafkaJs client:

    • 0 = nothing
    • 1 = error
    • 2 = warning
    • 4 = info (default)
    • 5 = debug
  • SNS_ENDPOINT: optional AWS SNS endpoint, default undefined

  • AWS_REGION: AWS region, default eu-west-1

  • SSL_CERT: SSL certificate

  • SSL_KEY: SSL key

  • SSL_CA: SSL certificate authority

  • REVERSE_DNS: Reverse DNS to customise the type field of the event payload

Event payload format

In order to send an event, the payload must follow the cloudevents format.

Node MessageBrokers exposes an event factory for v1.0 of the standard. In order to use it, import the factory

import { CloudEventFactory } from '@micheleangioni/node-messagebrokers';

and use the createV1 factory method

CloudEventFactory.createV1(
  aggregate: string,
  eventType: string,
  source: string,
  data: any,
  options: CreateEventV1Options = {},
)

where CreateEventV1Options is as follows

export type CreateEventV1Options = {
  datacontenttype?: string, // default 'application/json'
  dataschema?: string,
  subject?: string,
};

Usage

Instantiating the Client

Instantiation is similar for all clients. In order to create an instance, it's enough to provide the topic list when using the client factory:

import brokerFactory from '@micheleangioni/node-messagebrokers';

const topics = {
  user: {
    topic: 'myCompany.events.identity.user',
    (numPartitions: 2),
    (replicationFactor: 1),
  }
};

const broker = brokerFactory(topics);

The topics parameter must follow the following type definition:

type KafkaTopic = {
  topic: string
  numPartitions?: number // Only for Apache Kafka clients
  replicationFactor?: number // Only for Apache Kafka clients
  replicaAssignment?: object[] // Only for KafkaJs client
  configEntries?: object[],  // Only for KafkaJs client
};

type KafkaTopics = {
  [aggregate: string]: KafkaTopic,
};

This structure enforces to provide a different topic per Aggregate.

Every client has its own instantiation options as well.

Furthermore, before being used, most clients need to be initialized through the async init() method. Due to some particularity of each client, the brokers' init() methods have different option parameters.

KafkaJs

Instantiation

import brokerFactory from '@micheleangioni/node-messagebrokers';

const topics = {
  user: {
    topic: 'myCompany.events.identity.user',
    numPartitions: 16,
    replicationFactor: 3,
  }
};

const broker = brokerFactory(topics);

If the topics don't exist yet, provide also the createTopics: true during the initialization.

Initialization

await broker.init(kafkaJsClientConfiguration);

where consumerConfig has the following signature:

export interface RetryOptions {
  maxRetryTime?: number
  initialRetryTime?: number
  factor?: number
  multiplier?: number
  retries?: number
}

interface ConsumerConfig {
  groupId: string
  metadataMaxAge?: number
  sessionTimeout?: number
  rebalanceTimeout?: number
  heartbeatInterval?: number
  maxBytesPerPartition?: number
  minBytes?: number
  maxBytes?: number
  maxWaitTimeInMs?: number
  retry?: RetryOptions
  allowAutoTopicCreation?: boolean
  maxInFlightRequests?: number
  readUncommitted?: boolean
}

type KafkaJsClientConfiguration = ConsumerConfig & {
  createTopics?: boolean;
};

Simple example

await broker.init({ groupId: 'my-consumer-group-id' });

Creating also the topics during the initialization

await broker.init({ createTopics: true, groupId: 'my-consumer-group-id' });

Creating a Consumer

const consumer = await broker.addConsumer([], consumerConfig); // the first argument is ignored

where consumerConfig has the following type signature:

export interface IHeaders {
  [key: string]: Buffer | string
}

export type KafkaMessage = {
  key: Buffer
  value: Buffer // Message payload
  timestamp: string
  size: number
  attributes: number
  offset: string
  headers?: IHeaders
}

export type AggregateConsumerConf = {
  handler: (message: KafkaMessage) => Promise<void>;
  fromBeginning?: boolean; // Fetch messages from the beginning of the topic, default false
  topic: string;
};

type KafkaJsConsumerConfig = {
  aggregates: {
    [aggregate: string]: AggregateConsumerConf;
  };
  consumerRunConfig?: {
    autoCommit?: boolean;
    autoCommitInterval?: number | null;
    autoCommitThreshold?: number | null;
    eachBatchAutoResolve?: boolean;
    partitionsConsumedConcurrently?: number; // Number of running concurrent partition handlers, default 1
  };
  useBatches?: boolean; // true: use batches, false (default): use single messages
};

Simple example:

const consumer = await broker.addConsumer([], {
  aggregates: {
    user: {
      // eslint-disable-next-line @typescript-eslint/require-await
      handler: async (message: KafkaMessage) => {
        const eventPayload = JSON.parse(message.value.toString());
        expect(eventPayload.data).toEqual(data);
        done();
      },
      topic: 'myCompany.events.identity.user',
    },
  },
  consumerRunConfig: {
    partitionsConsumedConcurrently: 3,
  },
  useBatches: false,
});

Sending messages

const cloudEvent = CloudEventFactory.createV1(
  aggregate,
  eventType,
  source,
  data,
);

await broker.sendMessage(
  aggregate,
  [cloudEvent],
  { partitionKey },
)

Simple example:

const aggregate = 'user';

const cloudEvent = CloudEventFactory.createV1(
  aggregate,
  'UserCreated',
  '/users',
  {
    email: 'voodoo@gmail.com',
    username: 'Voodoo',
  },
);

await broker.sendMessage(
  aggregate,
  [cloudEvent],
)

AWS SNS

Instantiation

import brokerFactory from '@micheleangioni/node-messagebrokers';

const broker = brokerFactory(topics);

If the topics don't exist yet, provide also the createTopics: true key during the initialization (next paragraph). This requires of course the sns:CreateTopic permission.

When connecting to AWS the client needs to fetch the ARNs list of input topics. By default, it performs a listTopics call to SNS and therefore the sns:ListTopic permission is needed.

There are 2 possibilities to improve the performances and avoid a lookup over all existing topics:

  1. The account has the sns:CreateTopic permission. In this case, provide the createTopics: true key during initialization even if the topics have already been created

  2. Pass the AWS Account Id in order to re-build the topics ARNs without having to query AWS

const broker = brokerFactory(topics, { awsAccountId: '1234567890' });

Initialization

await broker.init(initConfigurations);

where initConfigurations is optional and has the same structure of the options constructor parameter of the official SDK.

plus the optional createTopics: boolean key.

Simple example:

await broker.init();

Creating also the topics during the initialization

await broker.init({ createTopics: true });

Adding a Consumer

const subscriptionResponse = await addConsumer(aggregate, consumerConfig);

where aggregate is a string and consumerConfig has the following signature

type SnsConsumerOptions = {
  attributes?: SubscriptionAttributesMap,
  endpoint: string,
  protocol: SnsProtocol,
};

where endpoint is the endpoint to which the consumer can be reached, SnsProtocol is one of the supporter SNS protocols

enum SnsProtocol {
  EMAIL = 'email',
  EMAIL_JSON = 'email-json',
  HTTP = 'http',
  HTTPS = 'https',
  SMS = 'sms',
  SQS = 'sqs',
  APPLICATION = 'application',
  LAMBDA = 'lambda',
}

and SubscriptionAttributesMap is one of the attributes of the official SDK.

Simple example for an HTTP consumer:

await broker.addConsumer('user', { endpoint: 'https://myconsumerhost.com/api/example', protocol: SnsProtocol.HTTP });

Sending messages

const cloudEvent = CloudEventFactory.createV1(
  aggregate,
  eventType,
  source,
  data,
);

await broker.sendMessage(
  aggregate,
  [cloudEvent],
)

Simple example:

const aggregate = 'user';

const cloudEvent = CloudEventFactory.createV1(
  aggregate,
  'UserCreated',
  '/users',
  {
    email: 'voodoo@gmail.com',
    username: 'Voodoo',
  },
);

await broker.sendMessage(
  aggregate,
  [cloudEvent],
)

Tests

In order to run the tests, follow the following steps:

  • Run docker compose via docker-compose up -d (TMPDIR=/private$TMPDIR docker-compose up in MacOS)
  • Run the tests via npm test

Contribution Guidelines

Pull requests are welcome. Help is needed to add other clients.

License

Node MessageBrokers is free software distributed under the terms of the MIT license.