ksqlDb.RestApi.Client.ProtoBuf

ksqlDB.RestApi.Client.ProtoBuf adds support for Protobuf content type. ksqlDB.RestApi.Client is a C# LINQ-enabled client API for issuing and consuming ksqlDB push queries. Targets .NET 6, .NET 7, and .NET 8. Documentation for the library can be found at https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet/blob/main/README.md.


Keywords
.NET, LINQ, ProtoBuf, csharp, ksql, ksqlDB, push, query, dotnet, kafka, reactive-streams
License
MIT
Install
Install-Package ksqlDb.RestApi.Client.ProtoBuf -Version 4.0.0

Documentation

This package enables seamless integration of KSQL push and pull queries with LINQ queries in your .NET C# applications. It allows you to perform server-side operations such as filtering, projection, limiting, and more directly on push notifications using ksqlDB push queries. This facilitates continuous processing of computations over unbounded, potentially never-ending, streams of data.

In addition, the package supports executing SQL statements via the REST API. You can use it to insert records into streams, create tables and types, and perform administrative tasks such as listing available streams.

ksqlDB.RestApi.Client is a contribution to Confluent ksqldb-clients

main

Install with NuGet package manager:

Install-Package ksqlDb.RestApi.Client

or with .NET CLI

dotnet add package ksqlDb.RestApi.Client

This adds a <PackageReference> to your csproj file, similar to the following:

<PackageReference Include="ksqlDb.RestApi.Client" Version="6.0.0" />

Alternative option is to use Protobuf content type:

dotnet add package ksqlDb.RestApi.Client.ProtoBuf

Feel free to experiment with the following example in a .NET interactive Notebook:

using ksqlDB.RestApi.Client.KSql.Linq;
using ksqlDB.RestApi.Client.KSql.Query;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;

var ksqlDbUrl = @"http://localhost:8088";

var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
{
  ShouldPluralizeFromItemName = true
};

await using var context = new KSqlDBContext(contextOptions);

using var subscription = context.CreatePushQuery<Tweet>()
  .WithOffsetResetPolicy(AutoOffsetReset.Latest)
  .Where(p => p.Message != "Hello world" || p.Id == 1)
  .Select(l => new { l.Message, l.Id })
  .Take(2)
  .Subscribe(tweetMessage =>
  {
    Console.WriteLine($"{nameof(Tweet)}: {tweetMessage.Id} - {tweetMessage.Message}");
  }, error => { Console.WriteLine($"Exception: {error.Message}"); }, () => Console.WriteLine("Completed"));

Console.WriteLine("Press any key to stop the subscription");

Console.ReadKey();
public class Tweet : Record
{
  public int Id { get; set; }

  public string Message { get; set; }
}

An entity class in ksqlDB.RestApi.Client represents the structure of a table or stream. An instance of the class represents a record in that stream or table while properties are mapped to columns respectively.

LINQ code written in C# from the sample is equivalent to this KSQL query:

SELECT Message, Id
  FROM Tweets
 WHERE Message != 'Hello world' OR Id = 1
  EMIT CHANGES
 LIMIT 2;

In the provided C# code snippet, most of the code executes on the server side except for the IQbservable<TEntity>.Subscribe extension method. This method is responsible for subscribing to your ksqlDB stream, which is created using the following approach:

using ksqlDB.RestApi.Client.KSql.RestApi.Http;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.Api.Client.Samples.Models;

EntityCreationMetadata metadata = new(kafkaTopic: nameof(Tweet))
{
  Partitions = 3,
  Replicas = 3
};

var httpClient = new HttpClient()
{
  BaseAddress = new Uri(@"http://localhost:8088")
};

var httpClientFactory = new HttpClientFactory(httpClient);
var restApiClient = new KSqlDbRestApiClient(httpClientFactory);

var httpResponseMessage = await restApiClient.CreateOrReplaceStreamAsync<Tweet>(metadata);

CreateOrReplaceStreamAsync executes the following statement:

CREATE OR REPLACE STREAM Tweets (
  Id INT,
  Message VARCHAR
) WITH ( KAFKA_TOPIC='Tweet', VALUE_FORMAT='Json', PARTITIONS='3', REPLICAS='3' );

Execute the following insert statements to publish messages using your ksqldb-cli

docker exec -it $(docker ps -q -f name=ksqldb-cli) ksql http://ksqldb-server:8088
INSERT INTO tweets (id, message) VALUES (1, 'Hello world');
INSERT INTO tweets (id, message) VALUES (2, 'ksqlDB rulez!');

or insert a record from C#:

var responseMessage = await new KSqlDbRestApiClient(httpClientFactory)
  .InsertIntoAsync(new Tweet { Id = 2, Message = "ksqlDB rulez!" });

or with KSqlDbContext:

await using var context = new KSqlDBContext(ksqlDbUrl);

context.Add(new Tweet { Id = 1, Message = "Hello world" });
context.Add(new Tweet { Id = 2, Message = "ksqlDB rulez!" });

var saveChangesResponse = await context.SaveChangesAsync();

Sample projects can be found under Samples solution folder in ksqlDB.RestApi.Client.sln

External dependencies:

Clone the repository

git clone https://github.com/tomasfabian/ksqlDB.RestApi.Client-DotNet.git

CD to Samples

CD Samples\ksqlDB.RestApi.Client.Sample\

run in command line:

docker compose up -d

AspNet Blazor server side sample:

In Blazor, the application logic and UI rendering occur on the server. The client's web browser receives updates and UI changes through a SignalR connection. This ensures smooth integration with the ksqlDB.RestApi.Client library, allowing the Apache Kafka broker and ksqlDB to remain hidden from direct exposure to clients. The server-side Blazor application communicates with ksqlDB using the ksqlDB.RestApi.Client. Whenever an event in ksqlDB occurs, the server-side Blazor app responds and signals the UI in the client's browser to update. This setup allows a smooth and continuous update flow, creating a real-time, reactive user interface.

  • set docker-compose.csproj as startup project in InsideOut.sln for embedded Kafka connect integration and stream processing examples.

IQbservable<T> extension methods

As depicted below IObservable<T> is the dual of IEnumerable<T> and IQbservable<T> is the dual of IQueryable<T>. In all four cases LINQ providers are using deferred execution. While the first two are executed locally the latter two are executed server side. The server side execution is possible thanks to traversing ASTs (Abstract Syntax Trees) with visitors. The KSqlDbProvider will create the KSQL syntax for you from expression trees and pass it along to ksqlDB.

Both IObservable<T> and IQbservable<T> represent push-based sequences of asynchronous and potentially infinite events, while IEnumerable<T> and IQueryable<T> represent collections or pull-based sequences of items that can be iterated or queried, respectively.

List of supported push query extension methods:

Register the KSqlDbContext

IKSqlDBContext and IKSqlDbRestApiClient can be provided with dependency injection. These services can be registered during app startup and components that require these services, are provided with these services via constructor parameters.

To register KSqlDbContext as a service, open Program.cs, and add the lines to the ConfigureServices method shown below or see some more details in the workshop:

using ksqlDB.RestApi.Client.Sensors;
using ksqlDB.RestApi.Client.KSql.Query.Options;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.Sensors.KSqlDb;

IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
      var ksqlDbUrl = @"http://localhost:8088";

      services.AddDbContext<ISensorsKSqlDbContext, SensorsKSqlDbContext>(
        options =>
        {
          var setupParameters = options.UseKSqlDb(ksqlDbUrl);

          setupParameters.Options.ShouldPluralizeFromItemName = false;
          setupParameters.SetAutoOffsetReset(AutoOffsetReset.Earliest);

        }, ServiceLifetime.Transient, restApiLifetime: ServiceLifetime.Transient);
    })
    .Build();

await host.RunAsync();

KSqlDbContextOptions builder

To modify parameters or introduce new ones, utilize the following approach:

var contextOptions = new KSqlDbContextOptionsBuilder()
  .UseKSqlDb("http://localhost:8088)
  .SetBasicAuthCredentials("fred", "flinstone")
  .SetJsonSerializerOptions(jsonOptions =>
  {
    jsonOptions.IgnoreReadOnlyFields = true;
  })
  .SetAutoOffsetReset(AutoOffsetReset.Latest)
  .SetProcessingGuarantee(ProcessingGuarantee.ExactlyOnce)
  .SetIdentifierEscaping(IdentifierEscaping.Keywords)
  .SetupPushQuery(options =>
  {
    options.Properties["ksql.query.push.v2.enabled"] = "true";
  })
  .Options;

This code initializes a KSqlDbContextOptionsBuilder to configure settings for a ksqlDB context. Here's a breakdown of the configurations:

  • UseKSqlDb("http://localhost:8088"): Specifies the URL of the ksqlDB server.
  • SetBasicAuthCredentials("fred", "flinstone"): Sets the basic authentication credentials (username and password) for accessing the ksqlDB server.
  • SetJsonSerializerOptions(jsonOptions => { ... }): Configures JSON serialization options, such as ignoring read-only fields.
  • SetAutoOffsetReset(AutoOffsetReset.Latest): Sets the offset reset behavior to start consuming messages from the latest available when no committed offset is found. By default, 'auto.offset.reset' is configured to 'earliest'.
  • SetProcessingGuarantee(ProcessingGuarantee.ExactlyOnce): Specifies the processing guarantee as exactly-once semantics.
  • SetIdentifierEscaping(IdentifierEscaping.Keywords): Escapes identifiers such as table and column names that are SQL keywords.
  • SetupPushQuery(options => { ... }): Configures push query options, specifically enabling KSQL query push version 2.

Finally, .Options returns the configured options for the ksqlDB context.

Overriding stream names

Stream names are generated based on the generic record types. They are pluralized with Pluralize.NET package.

By default the generated from item names such as stream and table names are pluralized. This behavior could be switched off with the following ShouldPluralizeStreamName configuration.

context.CreatePushQuery<Person>();
FROM People

This can be disabled:

var contextOptions = new KSqlDBContextOptions(@"http://localhost:8088")
{
  ShouldPluralizeFromItemName = false
};

new KSqlDBContext(contextOptions).CreatePushQuery<Person>();
FROM Person

Setting an arbitrary stream name (from_item name):

context.CreatePushQuery<Tweet>("custom_topic_name");
FROM custom_topic_name

KSqlDbRestApiClient

The KSqlDbRestApiClient supports various operations such as executing KSQL statements, inserting data into streams asynchronously, creating, listing or dropping entities, and managing KSQL connectors.

using ksqlDB.RestApi.Client.KSql.RestApi;
using ksqlDB.RestApi.Client.KSql.RestApi.Enums;
using ksqlDB.RestApi.Client.KSql.RestApi.Extensions;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;
using ksqlDB.RestApi.Client.KSql.RestApi.Serialization;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Properties;
using ksqlDB.RestApi.Client.Samples.Models.Movies;

public static async Task ExecuteAsync(CancellationToken cancellationToken = default)
{
  var httpClient = new HttpClient()
  {
    BaseAddress = new Uri("http://localhost:8088")
  };
  var httpClientFactory = new HttpClientFactory(httpClient);
  var kSqlDbRestApiClient = new KSqlDbRestApiClient(httpClientFactory);

  EntityCreationMetadata entityCreationMetadata = new(kafkaTopic: "companyname.movies")
  {
    Partitions = 3,
    Replicas = 3,
    ValueFormat = SerializationFormats.Json,
    IdentifierEscaping = IdentifierEscaping.Keywords
  };

  var httpResponseMessage = await kSqlDbRestApiClient.CreateOrReplaceTableAsync<Movie>(entityCreationMetadata, cancellationToken);
  var responses = await httpResponseMessage.ToStatementResponsesAsync();
  Console.WriteLine($"Create or replace table response: {responses[0].CommandStatus!.Message}");

  Console.WriteLine($"{Environment.NewLine}Available tables:");
  var tablesResponses = await kSqlDbRestApiClient.GetTablesAsync(cancellationToken);
  Console.WriteLine(string.Join(', ', tablesResponses[0].Tables!.Select(c => c.Name)));

  var dropProperties = new DropFromItemProperties
  {
    UseIfExistsClause = true,
    DeleteTopic = true,
    IdentifierEscaping = IdentifierEscaping.Keywords
  };
  httpResponseMessage = await kSqlDbRestApiClient.DropTableAsync<Movie>(dropProperties, cancellationToken: cancellationToken);
  tablesResponses = await kSqlDbRestApiClient.GetTablesAsync(cancellationToken);
}
using ksqlDB.RestApi.Client.KSql.Query;
using ksqlDB.RestApi.Client.KSql.RestApi.Statements.Annotations;

public class Movie : Record
{
  [Key]
  public int Id { get; set; }
  public string Title { get; set; } = null!;
}

Model builder

By leveraging the ksqlDb.RestApi.Client fluent API model builder, you can streamline the configuration process, improve code readability, and mitigate issues related to code regeneration by keeping configuration logic separate from generated POCOs.

using ksqlDb.RestApi.Client.FluentAPI.Builders;
using ksqlDb.RestApi.Client.FluentAPI.Builders.Configuration;

ModelBuilder modelBuilder = new();

var decimalTypeConvention = new DecimalTypeConvention(14, 14);

modelBuilder.AddConvention(decimalTypeConvention);

modelBuilder.Entity<Payment>()
  .Property(b => b.Amount)
  .Decimal(precision: 10, scale: 2);

modelBuilder.Entity<Payment>()
  .Property(b => b.Description)
  .HasColumnName("Desc");

modelBuilder.Entity<Account>()
  .HasKey(c => c.Id);

modelBuilder.Entity<Account>()
  .Property(b => b.Secret)
  .Ignore();

C# entity definitions:

record Payment
{
  public string Id { get; set; } = null!;
  public decimal Amount { get; set; }
  public string Description { get; set; } = null!;
}

record Account
{
  public string Id { get; set; } = null!;
  public decimal Balance { get; set; }
  public string Secret { get; set; }
}

Usage with ksqlDB REST API Client:

var kSqlDbRestApiClient = new KSqlDbRestApiClient(httpClientFactory, modelBuilder);
await kSqlDbRestApiClient.CreateTypeAsync<Payment>(cancellationToken);

var entityCreationMetadata = new EntityCreationMetadata(kafkaTopic: nameof(Account), partitions: 3)
{
  Replicas = 3
};
responseMessage = await restApiProvider.CreateTableAsync<Account>(entityCreationMetadata, true, cancellationToken);

Generated KSQL:

CREATE TYPE Payment AS STRUCT<Id VARCHAR, Amount DECIMAL(10,2), Desc VARCHAR>;

CREATE TABLE IF NOT EXISTS Accounts (
	Id VARCHAR PRIMARY KEY,
	Balance DECIMAL(14,14)
) WITH ( KAFKA_TOPIC='Account', VALUE_FORMAT='Json', PARTITIONS='3', REPLICAS='3' );

The Description property within the Payment type has been customized to override the resulting column name as "Desc". Additionally, the Id property within the Account table has been designated as the primary key, while the Secret property is disregarded during code generation.

Aggregation functions

List of supported ksqldb aggregation functions:

Rest API reference

List of supported data types:

List of supported Joins:

List of supported pull query extension methods:

List of supported ksqlDB SQL statements:

KSqlDbContext

Config

Operators

Data definitions

Miscelenaous

Functions

LinqPad samples

Push Query

Pull Query

Nuget

https://www.nuget.org/packages/ksqlDB.RestApi.Client/

ksqldb links

Scalar functions

Aggregation functions

Push query

Acknowledgements: