Raduz.KafkaClient.Consumer

Package Description


Keywords
License
MIT
Install
Install-Package Raduz.KafkaClient.Consumer -Version 4.0.3

Documentation

logo

Raduz.KafkaClient

Easy to use kafka .net6 client.

  • publishing and consuming AVRO schema based topics easilly with mediator request and handlers
  • some logs of operations and their state

How to use

You can download packages from here Or just install from nuget.org:

  • NuGet\Install-Package Raduz.KafkaClient.Client -Version 1.0.0
    • Universall with all of bellow
  • NuGet\Install-Package Raduz.KafkaClient.Common -Version 1.0.0
    • Some extensions
  • NuGet\Install-Package Raduz.KafkaClient.Consumer -Version 1.0.0
    • Can be used as solo consumer
  • NuGet\Install-Package Raduz.KafkaClient.Contracts -Version 1.0.0
    • Can be used as microsoft "Abstractions" packages in projects,
      where is no need to include whole package,
      but you need contracts for building requests or some pipelines etc.
  • NuGet\Install-Package Raduz.KafkaClient.Publisher -Version 1.0.0
    • Can be used as solo publisher

AVRO object creation

How to create AVROModel you can read here

  • !Dont forget to add its avsc file to your schema registry!

Registration

To use kafka client you have to register it in your DI container and provide configuration

appsettings.json

In app settings you can set consumer, producer and schema registry (at this point there is no common registration).
All registration settings are made by Confluent.Kafka. You can read more in their documentation

"ConsumerConfig": {
    "GroupId": "{CONSUMER-GROUP-ID}",
    "BootstrapServers": "{YOUR-BOOTSTRAP-KAFKA-SERVER}",
    "AutoOffsetReset" : 1,
    //OPTIONAL f.e.
    "SecurityProtocol": 3,
    "SaslMechanism": {SASL-MECHANISM},
    "SaslUsername": "{SASL-USER-NAME}",
    "SaslPassword": "{SASL-USER-PASSWORD}"
  },
  "SchemaRegistryConfig": {
    "Url": "{SCHEMA-REGISTRY-URL}",
    //OPTIONAL f.e.
    "BasicAuthUserInfo": "{KEY}:{TOKEN}"
  },
  "ProducerConfig": {
    "BootstrapServers": "{YOUR-BOOTSTRAP-KAFKA-SERVER}",
    //OPTIONAL f.e.
    "SecurityProtocol": 3,
    "SaslMechanism": {SASL-MECHANISM},
    "SaslUsername": "{SASL-USER-NAME}",
    "SaslPassword": "{SASL-USER-PASSWORD}"
  }

Configure services

Add this code snipet to your service registration.

   services.ConfigureKafkaPublisher(configuration)
      .ConfigureKafkaConsumer(configuration, opts =>
      {
        //Here you can register consumer requests for specific topic and AVRO model
        opts.AddConsumer<{AVRO-SCHEME}, {MEDIATOR-REQUEST}>("{TOPIC-NAME}");
        ...
      })

Consumer

Here we need to create request and handler implementing kafka client interfaces and abstract class

Request creation

Creating request is realy easy, cause only thing that you need is just to create class implementing abstract class KafkaClientRequest and provide some generated object from your AVRO scheme

public class YourRequest : KafkaClientRequest<{YOUR-AVRO-OBJECT}>
{
  public YourRequest({YOUR-AVRO-OBJECT} specificRecord) : base(specificRecord)
  {
  }
}

Handler creation

Handler should implement IKafkaClientRequestHandler where the generic argument is your created request

public class YourHandler : IKafkaClientRequestHandler<YourRequest>
{
  public YourHandler()
  {
    //YOUR DI
  }

  public async Task<bool> Handle(BulkCreateSubscriptionRequest request, CancellationToken cancellationToken)
  {
    //Handle request
    if ({HANDLED})
    {
      return true;
    }

    return false;
  }
}

Publisher

To publish record to Kafka there is nothing easier than injecting IKafkaPusher to your class and call like:

var publisher = app.Services.GetService<IKafkaPublisher>()!; // or some other way to obtain
await publisher.PublishAsync("{TOPIC-NAME}", "{SOME-KEY}", {YOUR-AVRO-OBJECT}, cancellationToken);

Used nuggets

Project 'Raduz.KafkaClient.Contracts' has the following package references

Confluent.SchemaRegistry.Serdes.Avro 1.9.3 MediatR 11.0.0

Project 'Raduz.KafkaClient.Consumer' has the following package references

MediatR.Extensions.Microsoft.DependencyInjection 11.0.0 Microsoft.Extensions.Configuration.Abstractions 6.0.0 Microsoft.Extensions.Hosting.Abstractions 6.0.0
Microsoft.Extensions.Options.ConfigurationExtensions 6.0.0
YCherkes.SchemaRegistry.Serdes.Avro 1.0.4

Project 'Raduz.KafkaClient.Publisher' has the following package references

Microsoft.Extensions.Configuration.Abstractions 6.0.0
Microsoft.Extensions.DependencyInjection.Abstractions 6.0.0
Microsoft.Extensions.Logging.Abstractions 6.0.2
Microsoft.Extensions.Options.ConfigurationExtensions 6.0.0

Project 'Raduz.KafkaClient.Common' has the following package references

Confluent.Kafka 1.9.3 Microsoft.Extensions.Logging.Abstractions 6.0.2