.NET toolkit for Kafka


Keywords
kafka, dotnet, protobuf, streaming, event-driven, asp-net-core, middleware
License
MIT
Install
Install-Package Krimson.State.MongoDB -Version 1.1.0-alpha.0.85

Documentation

Krimson

Just you wait...

A helper library for .NET that not only greatly simplifies the usage of Kafka but also covers common streaming scenarios including re-balancing.

Usage

For a step-by-step guide and code samples, you will need to wait a bit.

Take a look in the examples directory and at the integration tests for further examples.

Krimson automatically configures a schema registry instance.

Processor Examples

1. Consuming a message using a processor module

Krimson will automatically scan and register all modules.

using Krimson;
using Krimson.Examples.Messages.Telemetry;
using Krimson.Processors;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddKrimson()
    .AddProtobuf()
    .AddProcessor(
        prx => prx
            .ClientId("telemetry-processor")
            .InputTopic("telemetry")
    );

builder.Build().Run();

class TelemetryModule : KrimsonProcessorModule {
    public TelemetryModule() =>
        On<DeviceTelemetry>(
            (msg, ctx) => ctx.Logger.Information(
                "Processed {@Telemetry} with id: {RecordId}",
                msg, ctx.Record.Id
            )
        );
}

2. Consuming a message using an inline processor

using Krimson;
using Krimson.Examples.Messages.Telemetry;
using Krimson.Processors;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddKrimson()
    .AddProtobuf()
    .AddProcessor(
        prx => prx
            .ClientId("telemetry-processor")
            .InputTopic("telemetry")
            .Process<DeviceTelemetry>((msg, ctx) => ctx.Logger.Information("Processed {@Telemetry} with id: {RecordId}", msg, ctx.Record.Id))
    );

builder.Build().Run();

3. Processing a message and outputing other messages

using Krimson;
using Krimson.Examples.Messages.Telemetry;
using Krimson.Processors;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddKrimson()
    .AddProtobuf()
    .AddProcessor(
        prx => prx
            .ClientId("telemetry-processor")
            .InputTopic("telemetry")
            .OutputTopic("telemetry.power-consumption")
    );

builder.Build().Run();

class TelemetryModule : KrimsonProcessorModule {
    public TelemetryModule() =>
        On<DeviceTelemetry>(
            (telemetry, ctx) => {
                if (telemetry.DataType == nameof(PowerConsumption)) {
                    var message = new PowerConsumption {
                        DeviceId  = telemetry.DeviceId,
                        Unit      = telemetry.Data.Fields["unit"].StringValue,
                        Value     = telemetry.Data.Fields["value"].NumberValue,
                        Timestamp = telemetry.Timestamp
                    };

                    ctx.Output(message: message, key: telemetry.DeviceId);
                }
                else {
                    ctx.Output(
                        message: telemetry,
                        key:     telemetry.DeviceId,
                        topic:   "telemetry.unknown"
                    );
                }
            }
        );
}

Producer Examples

1. Producing a message and waiting for ack from broker (using the async method)

using Krimson;
using Krimson.Examples.Messages.Telemetry;
using Krimson.Producers;
using Microsoft.AspNetCore.Mvc;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

builder.Services
    .AddKrimson()
    .AddProtobuf()
    .AddProducer(pdx => pdx
        .ClientId("telemetry-gateway")
        .Topic("telemetry")
    );

var app = builder.Build();

if (app.Environment.IsDevelopment()) {
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();

app.Run();

[ApiController, Route("[controller]")]
public class IngressController : ControllerBase {
    public IngressController(KrimsonProducer producer) => Producer = producer;

    KrimsonProducer Producer { get; }
    
    [HttpPost(Name = "DispatchTelemetry")]
    public Task Dispatch(DeviceTelemetry telemetry) =>
        Producer.Produce(message: telemetry, key: telemetry.DeviceId);
}

Connectors Examples

1. Consuming a message using a processor module

Krimson will automatically scan and register all modules.

using Krimson;
using Krimson.Examples.Messages.Telemetry;
using Krimson.Processors;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddKrimson()
    .AddProtobuf()
    .AddProcessor(
        prx => prx
            .ClientId("telemetry-processor")
            .InputTopic("telemetry")
    );

builder.Build().Run();

class TelemetryModule : KrimsonProcessorModule {
    public TelemetryModule() =>
        On<DeviceTelemetry>(
            (msg, ctx) => ctx.Logger.Information(
                "Processed {@Telemetry} with id: {RecordId}",
                msg, ctx.Record.Id
            )
        );
}

Built With

Contributing

Please read CONTRIBUTING.md for details on our code of conduct, and the process for submitting pull requests to us.

Authors

License

This project is licensed under the MIT License - see the LICENSE file for details.