NServiceBus.SqlServer.Deduplication

Message deduplication for the SQL Server transport.


Keywords
NServiceBus, Native, Raw
License
MIT
Install
Install-Package NServiceBus.SqlServer.Deduplication -Version 14.0.0

Documentation

NServiceBus.SqlServer.Native

Build status NuGet Status NuGet Status NuGet Status

SQL Server Transport Native is a shim providing low-level access to the NServiceBus SQL Server Transport with no NServiceBus or SQL Server Transport reference required.

See Milestones for release notes.

Already a Patron? skip past this section

Community backed

It is expected that all developers either become a Patron to use NServiceBusExtensions. Go to licensing FAQ

Sponsors

Support this project by becoming a Sponsor. The company avatar will show up here with a website link. The avatar will also be added to all GitHub repositories under the NServiceBusExtensions organization.

Patrons

Thanks to all the backing developers. Support this project by becoming a patron.

Contents

NuGet packages

Usage scenarios

  • Error or Audit queue handling: Allows to consume messages from error and audit queues, for example to move them to a long-term archive. NServiceBus expects to have a queue per message type, so NServiceBus endpoints are not suitable for processing error or audit queues. SQL Native allows manipulation or consumption of queues containing multiple types of messages.
  • Corrupted or malformed messages: Allows to process poison messages which can't be deserialized by NServiceBus. In SQL Native message headers and body are treated as a raw string and byte array, so corrupted or malformed messages can be read and manipulated in code to correct any problems.
  • Deployment or decommission: Allows to perform common operational activities, similar to operations scripts. Running installers requires starting a full endpoint. This is not always ideal during the execution of a deployment or decommission. SQL Native allows creating or deleting of queues with no running endpoint, and with significantly less code. This also makes it a better candidate for usage in deployment scripting languages like PowerShell.
  • Bulk operations: SQL Native supports sending and receiving of multiple messages within a single SQLConnection and SQLTransaction.
  • Explicit connection and transaction management: NServiceBus abstracts the SQLConnection and SQLTransaction creation and management. SQL Native allows any consuming code to manage the scope and settings of both the SQLConnection and SQLTransaction.
  • Message pass through: SQL Native reduces the amount of boilerplate code and simplifies development.

Main Queue

Queue management

Queue management for the native delayed delivery functionality.

See also SQL Server Transport - SQL statements.

Create

The queue can be created using the following:

var manager = new QueueManager("endpointTable", sqlConnection);
await manager.Create();

snippet source | anchor

Delete

The queue can be deleted using the following:

var manager = new QueueManager("endpointTable", sqlConnection);
await manager.Drop();

snippet source | anchor

Sending messages

Sending to the main transport queue.

Single

Sending a single message.

var manager = new QueueManager("endpointTable", sqlConnection);
var message = new OutgoingMessage(
    id: Guid.NewGuid(),
    headers: headers,
    bodyBytes: body);
await manager.Send(message);

snippet source | anchor

Batch

Sending a batch of messages.

var manager = new QueueManager("endpointTable", sqlConnection);
var messages = new List<OutgoingMessage>
{
    new(
        id: Guid.NewGuid(),
        headers: headers1,
        bodyBytes: body1),
    new(
        id: Guid.NewGuid(),
        headers: headers2,
        bodyBytes: body2),
};
await manager.Send(messages);

snippet source | anchor

Reading messages

"Reading" a message returns the data from the database without deleting it.

Single

Reading a single message.

var manager = new QueueManager("endpointTable", sqlConnection);
var message = await manager.Read(rowVersion: 10);

if (message != null)
{
    Console.WriteLine(message.Headers);
    if (message.Body != null)
    {
        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync();
        Console.WriteLine(bodyText);
    }
}

snippet source | anchor

Batch

Reading a batch of messages.

var manager = new QueueManager("endpointTable", sqlConnection);
var result = await manager.Read(
    size: 5,
    startRowVersion: 10,
    func: async (message, cancel) =>
    {
        Console.WriteLine(message.Headers);
        if (message.Body == null)
        {
            return;
        }

        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync(cancel);
        Console.WriteLine(bodyText);
    });

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

snippet source | anchor

RowVersion tracking

For many scenarios, it is likely to be necessary to keep track of the last message RowVersion that was read. A lightweight implementation of the functionality is provided by RowVersionTracker. RowVersionTracker stores the current RowVersion in a table containing a single column and row.

var versionTracker = new RowVersionTracker();

// create table
await versionTracker.CreateTable(sqlConnection);

// save row version
await versionTracker.Save(sqlConnection, newRowVersion);

// get row version
var startingRow = await versionTracker.Get(sqlConnection);

snippet source | anchor

Note that this is only one possible implementation of storing the current RowVersion.

Processing loop

For scenarios where continual processing (reading and executing some code with the result) of incoming messages is required, MessageProcessingLoop can be used.

An example use case is monitoring an error queue. Some action should be taken when a message appears in the error queue, but it should remain in that queue in case it needs to be retried.

Note that in the below snippet, the above RowVersionTracker is used for tracking the current RowVersion.

var rowVersionTracker = new RowVersionTracker();

var startingRow = await rowVersionTracker.Get(sqlConnection);

static async Task Callback(
    SqlTransaction transaction,
    IncomingMessage message,
    Cancel cancel)
{
    if (message.Body == null)
    {
        return;
    }

    using var reader = new StreamReader(message.Body);
    var bodyText = await reader.ReadToEndAsync(cancel);
    Console.WriteLine($"Message received in error message:\r\n{bodyText}");
}

static void ErrorCallback(Exception exception) =>
    Environment.FailFast("Message processing loop failed", exception);

Task<SqlTransaction> BuildTransaction(Cancel cancel) =>
    ConnectionHelpers.BeginTransaction(connectionString, cancel);

Task PersistRowVersion(
    SqlTransaction transaction,
    long rowVersion,
    Cancel cancel) =>
    rowVersionTracker.Save(sqlConnection, rowVersion, cancel);

var processingLoop = new MessageProcessingLoop(
    table: "error",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: BuildTransaction,
    callback: Callback,
    errorCallback: ErrorCallback,
    startingRow: startingRow,
    persistRowVersion: PersistRowVersion);
processingLoop.Start();

Console.ReadKey();

await processingLoop.Stop();

snippet source | anchor

Consuming messages

"Consuming" a message returns the data from the database and also deletes that message.

Single

Consume a single message.

var manager = new QueueManager("endpointTable", sqlConnection);
var message = await manager.Consume();

if (message != null)
{
    Console.WriteLine(message.Headers);
    if (message.Body != null)
    {
        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync();
        Console.WriteLine(bodyText);
    }
}

snippet source | anchor

Batch

Consuming a batch of messages.

var manager = new QueueManager("endpointTable", sqlConnection);
var result = await manager.Consume(
    size: 5,
    func: async (message, cancel) =>
    {
        Console.WriteLine(message.Headers);
        if (message.Body == null)
        {
            return;
        }

        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync(cancel);
        Console.WriteLine(bodyText);
    });

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

snippet source | anchor

Consuming loop

For scenarios where continual consumption (consuming and executing some code with the result) of incoming messages is required, MessageConsumingLoop can be used.

An example use case is monitoring an audit queue. Some action should be taken when a message appears in the audit queue, and it should be purged from the queue to free up the storage space.

static async Task Callback(
    SqlTransaction transaction,
    IncomingMessage message,
    Cancel cancel)
{
    if (message.Body != null)
    {
        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync(cancel);
        Console.WriteLine($"Reply received:\r\n{bodyText}");
    }
}

Task<SqlTransaction> BuildTransaction(Cancel cancel) =>
    ConnectionHelpers.BeginTransaction(connectionString, cancel);

static void ErrorCallback(Exception exception) =>
    Environment.FailFast("Message consuming loop failed", exception);

// start consuming
var consumingLoop = new MessageConsumingLoop(
    table: "endpointTable",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: BuildTransaction,
    callback: Callback,
    errorCallback: ErrorCallback);
consumingLoop.Start();

// stop consuming
await consumingLoop.Stop();

snippet source | anchor

Delayed Queue

Queue management

Queue management for the native delayed delivery functionality.

See also SQL Server Transport - SQL statements.

Create

The queue can be created using the following:

var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await manager.Create();

snippet source | anchor

Delete

The queue can be deleted using the following:

var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await manager.Drop();

snippet source | anchor

Sending messages

Single

Sending a single message.

var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var message = new OutgoingDelayedMessage(
    due: DateTime.UtcNow.AddDays(1),
    headers: headers,
    bodyBytes: body);
await manager.Send(message);

snippet source | anchor

Batch

Sending a batch of messages.

var manager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var messages = new List<OutgoingDelayedMessage>
{
    new(
        due: DateTime.UtcNow.AddDays(1),
        headers: headers1,
        bodyBytes: body1),
    new(
        due: DateTime.UtcNow.AddDays(1),
        headers: headers2,
        bodyBytes: body2),
};
await manager.Send(messages);

snippet source | anchor

Reading messages

"Reading" a message returns the data from the database without deleting it.

Single

Reading a single message.

var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var message = await manager.Read(rowVersion: 10);

if (message != null)
{
    Console.WriteLine(message.Headers);
    if (message.Body != null)
    {
        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync();
        Console.WriteLine(bodyText);
    }
}

snippet source | anchor

Batch

Reading a batch of messages.

var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var result = await manager.Read(
    size: 5,
    startRowVersion: 10,
    func: async (message, cancel) =>
    {
        Console.WriteLine(message.Headers);
        if (message.Body == null)
        {
            return;
        }

        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync(cancel);
        Console.WriteLine(bodyText);
    });

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

snippet source | anchor

Consuming messages

"Consuming" a message returns the data from the database and also deletes that message.

Single

Consume a single message.

var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var message = await manager.Consume();

if (message != null)
{
    Console.WriteLine(message.Headers);
    if (message.Body != null)
    {
        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync();
        Console.WriteLine(bodyText);
    }
}

snippet source | anchor

Batch

Consuming a batch of messages.

var manager = new DelayedQueueManager("endpointTable", sqlConnection);
var result = await manager.Consume(
    size: 5,
    func: async (message, cancel) =>
    {
        Console.WriteLine(message.Headers);
        if (message.Body == null)
        {
            return;
        }

        using var reader = new StreamReader(message.Body);
        var bodyText = await reader.ReadToEndAsync(cancel);
        Console.WriteLine(bodyText);
    });

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

snippet source | anchor

Headers

There is a headers helpers class NServiceBus.Transport.SqlServerNative.Headers.

It contains several header related utilities.

Subscriptions

Queue management for the native publish subscribe functionality.

Table management

Create

The table can be created using the following:

var manager = new SubscriptionManager("SubscriptionRouting", sqlConnection);
await manager.Create();

snippet source | anchor

Delete

The table can be deleted using the following:

var manager = new SubscriptionManager("SubscriptionRouting", sqlConnection);
await manager.Drop();

snippet source | anchor

Deduplication

Some scenarios, such as HTTP message pass through, require message deduplication.

Table management

Create

The table can be created using the following:

var manager = new DedupeManager(sqlConnection, "DeduplicationTable");
await manager.Create();

snippet source | anchor

Delete

The table can be deleted using the following:

var manager = new DedupeManager(sqlConnection, "DeduplicationTable");
await manager.Drop();

snippet source | anchor

Sending messages

Sending to the main transport queue with deduplication.

Single

Sending a single message with deduplication.

var manager = new QueueManager(
    "endpointTable",
    sqlConnection,
    "DeduplicationTable");
var message = new OutgoingMessage(
    id: Guid.NewGuid(),
    headers: headers,
    bodyBytes: body);
await manager.Send(message);

snippet source | anchor

Batch

Sending a batch of messages with deduplication.

var manager = new QueueManager(
    "endpointTable",
    sqlConnection,
    "DeduplicationTable");
var messages = new List<OutgoingMessage>
{
    new(
        id: Guid.NewGuid(),
        headers: headers1,
        bodyBytes: body1),
    new(
        id: Guid.NewGuid(),
        headers: headers2,
        bodyBytes: body2),
};
await manager.Send(messages);

snippet source | anchor

Deduplication cleanup

Deduplication records need to live for a period of time after the initial corresponding message has been send. In this way an subsequent message, with the same message id, can be ignored. This necessitates a periodic cleanup process of deduplication records. This is achieved by using DeduplicationCleanerJob:

At application startup, start an instance of DeduplicationCleanerJob.

var cleaner = new DedupeCleanerJob(
    table: "Deduplication",
    connectionBuilder: cancel =>
        ConnectionHelpers.OpenConnection(connectionString, cancel),
    criticalError: _ => { },
    expireWindow: TimeSpan.FromHours(1),
    frequencyToRunCleanup: TimeSpan.FromMinutes(10));
cleaner.Start();

snippet source | anchor

Then at application shutdown stop the instance.

await cleaner.Stop();

snippet source | anchor

JSON headers

Serialization

Serialize a Dictionary<string, string> to a JSON string.

var headers = new Dictionary<string, string>
{
    {Headers.EnclosedMessageTypes, "SendMessage"}
};
var serialized = Headers.Serialize(headers);

snippet source | anchor

Deserialization

Deserialize a JSON string to a Dictionary<string, string>.

var headers = Headers.DeSerialize(headersString);

snippet source | anchor

Copied header constants

Contains all the string constants copied from NServiceBus.Headers.

Duplicated timestamp functionality

A copy of the timestamp format methods ToWireFormattedString and ToUtcDateTime.

ConnectionHelpers

The APIs of this extension target either a SQLConnection and SQLTransaction. Given that in configuration those values are often expressed as a connection string, ConnectionHelpers supports converting that string to a SQLConnection or SQLTransaction. It provides two methods OpenConnection and BeginTransaction with the effective implementation of those methods being:

public static async Task<SqlConnection> OpenConnection(
    string connectionString,
    Cancel cancel)
{
    var connection = new SqlConnection(connectionString);
    try
    {
        await connection.OpenAsync(cancel);
        return connection;
    }
    catch
    {
        connection.Dispose();
        throw;
    }
}

public static async Task<SqlTransaction> BeginTransaction(
    string connectionString,
    Cancel cancel)
{
    var connection = await OpenConnection(connectionString, cancel);
    return connection.BeginTransaction();
}

snippet source | anchor

SqlServer.HttpPassthrough

SQL HTTP Passthrough provides a bridge between an HTTP stream (via JavaScript on a web page) and the SQL Server transport.

See docs/http-passthrough.md.

Icon

Spear designed by Aldric Rodríguez from The Noun Project.