Microsoft.Azure.CosmosDB.BulkExecutor

This client library enables client applications to perform bulk operations in Azure Cosmos DB for SQL, Gremlin and MongoDB APIs. The BulkImport module (supported for SQL, Gremlin and MongoDB API) provides functionality to bulk ingest documents with optimizations to maximally utilize the allocated collection throughput for maximal write throughput. The BulkUpdate module (supported for SQL API) provides functionality to bulk update documents as partial patches. The BulkDelete module (supported for SQL API) provides functionality to bulk delete documents given a list of partition key and id tuples.


Keywords
microsoft, azure, cosmosdb, documentdb, docdb, nosql, azureofficial, bulkimport, bulkupdate, bulkdelete, graphapi, sqlapi, mongoapi
License
MIT
Install
Install-Package Microsoft.Azure.CosmosDB.BulkExecutor -Version 2.5.1-preview

Documentation

  Azure Cosmos DB BulkExecutor library for .NET

The Azure Cosmos DB BulkExecutor library for .NET acts as an extension library to the Cosmos DB .NET SDK and provides developers out-of-the-box functionality to perform bulk operations in Azure Cosmos DB.

Table of Contents

Consuming the Microsoft Azure Cosmos DB BulkExecutor .NET library

This project includes samples, documentation and performance tips for consuming the BulkExecutor library. You can download the official public NuGet package from here.

Bulk Import API

We provide two overloads of the bulk import API - one which accepts a list of JSON-serialized documents and the other a list of deserialized POCO documents.

  • With list of JSON-serialized documents
        Task<BulkImportResponse> BulkImportAsync(
            IEnumerable<string> documents,
            bool enableUpsert = false,
            bool disableAutomaticIdGeneration = true,
            int? maxConcurrencyPerPartitionKeyRange = null,
            int? maxInMemorySortingBatchSize = null,
            CancellationToken cancellationToken = default(CancellationToken));
  • With list of deserialized POCO documents
        Task<BulkImportResponse> BulkImportAsync(
            IEnumerable<object> documents,
            bool enableUpsert = false,
            bool disableAutomaticIdGeneration = true,
            int? maxConcurrencyPerPartitionKeyRange = null,
            int? maxInMemorySortingBatchSize = null,
            CancellationToken cancellationToken = default(CancellationToken));

Configurable parameters

  • enableUpsert : A flag to enable upsert of the documents if document with given id already exists - default value is false.
  • disableAutomaticIdGeneration : A flag to disable automatic generation of id if absent in the document - default value is true.
  • maxConcurrencyPerPartitionKeyRange : The maximum degree of concurrency per partition key range, setting to null will cause library to use default value of 20.
  • maxInMemorySortingBatchSize : The maximum number of documents pulled from the document enumerator passed to the API call in each stage for in-memory pre-processing sorting phase prior to bulk importing, setting to null will cause library to use default value of min(documents.count, 1000000).
  • cancellationToken : The cancellation token to gracefully exit bulk import.

Bulk import response object definition

The result of the bulk import API call contains the following attributes:

  • NumberOfDocumentsImported (long) : The total number of documents which were successfully imported out of the documents supplied to the bulk import API call.
  • TotalRequestUnitsConsumed (double) : The total request units (RU) consumed by the bulk import API call.
  • TotalTimeTaken (TimeSpan) : The total time taken by the bulk import API call to complete execution.
  • BadInputDocuments (List<object>) : The list of bad-format documents which were not successfully imported in the bulk import API call. User needs to fix the documents returned and retry import. Bad-format documents include documents whose id value is not a string (null or any other datatype is considered invalid).

Getting started with bulk import

  • Initialize DocumentClient set to Direct TCP connection mode
ConnectionPolicy connectionPolicy = new ConnectionPolicy
{
    ConnectionMode = ConnectionMode.Direct,
    ConnectionProtocol = Protocol.Tcp
};
DocumentClient client = new DocumentClient(
    new Uri(endpointUrl),
    authorizationKey,
    connectionPolicy)
  • Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();

// Set retries to 0 to pass complete control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  • Call BulkImportAsync API
BulkImportResponse bulkImportResponse = await bulkExecutor.BulkImportAsync(
    documents: documentsToImportInBatch,
    enableUpsert: true,
    disableAutomaticIdGeneration: true,
    maxConcurrencyPerPartitionKeyRange: null,
    maxInMemorySortingBatchSize: null,
    cancellationToken: token);

You can find the complete sample application program consuming the bulk import API here - which generates random documents to be then bulk imported into an Azure Cosmos DB collection. You can configure the application settings in appSettings here.

You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.

Performance of bulk import sample

Let us compare the performace of the bulk import sample application against a multi-threaded application which utilizes point writes (CreateDocumentAsync API in DocumentClient)

Both the applications are run on a standard DS16 v3 Azure VM in East US against a Cosmos DB collection in East US with 1 million RU/s allocated throughput.

The bulk import sample is executed with NumberOfDocumentsToImport set to 25 million and NumberOfBatches set to 25 (in App.config) and default parameters for the bulk import API. The multi-threaded point write application is set up with a DegreeOfParallelism set to 2000 (spawns 2000 concurrent tasks) which maxes out the VM's CPU.

We observe the following performance for ingestion of 25 million (~1KB) documents into a 1 million RU/s Cosmos DB collection:

Time taken (sec) Writes/second RU/s consumed
Bulk import API 262 95528 494186
Multi-threaded point write 2431 10280 72481

As seen, we observe >9x improvement in the write throughput using the bulk import API while providing out-of-the-box efficient handling of throttling, timeouts and transient exceptions - allowing easier scale-out by adding additional BulkExecutor client instances on individual VMs to achieve even greater write throughputs.

API implementation details

When a bulk import API is triggered with a batch of documents, on the client-side, they are first shuffled into buckets corresponding to their target Cosmos DB partition key range. Within each partiton key range bucket, they are broken down into mini-batches and each mini-batch of documents acts as a payload that is committed transactionally.

We have built in optimizations for the concurrent execution of these mini-batches both within and across partition key ranges to maximally utilize the allocated collection throughput. We have designed an AIMD-style congestion control mechanism for each Cosmos DB partition key range to efficiently handle throttling and timeouts.

These client-side optimizations augment server-side features specific to the BulkExecutor library which together make maximal consumption of available throughput possible.


Bulk Update API

The bulk update (a.k.a patch) API accepts a list of update items - each update item specifies the list of field update operations to be performed on a document identified by an id and parititon key value.

    Task<BulkUpdateResponse> BulkUpdateAsync(
        IEnumerable<UpdateItem> updateItems,
        int? maxConcurrencyPerPartitionKeyRange = null,
        int? maxInMemorySortingBatchSize = null,
        CancellationToken cancellationToken = default(CancellationToken));
  • Definition of UpdateItem
    class UpdateItem
    {
        public string Id { get; private set; }

        public string PartitionKey { get; private set; }

        public IEnumerable<UpdateOperation> UpdateOperations { get; private set; }

        public UpdateItem(
            string id,
            string partitionKey,
            IEnumerable<UpdateOperation> updateOperations)
        {
            this.Id = id;
            this.PartitionKey = partitionKey;
            this.UpdateOperations = updateOperations;
        }
    }

List of supported field update operations

  • Increment

Supports incrementing any numeric document field by a specific value

class IncUpdateOperation<TValue>
{
    IncUpdateOperation(string field, TValue value)
}
  • Set

Supports setting any document field to a specific value

class SetUpdateOperation<TValue>
{
    SetUpdateOperation(string field, TValue value)
}
  • Unset

Supports removing a specific document field along with all children fields

class UnsetUpdateOperation
{
    SetUpdateOperation(string field)
}
  • Array push

Supports appending an array of values to a document field which contains an array

class PushUpdateOperation
{
    PushUpdateOperation(string field, object[] value)
}
  • Array remove

Supports removing a specific value (if present) from a document field which contains an array

class RemoveUpdateOperation<TValue>
{
    RemoveUpdateOperation(string field, TValue value)
}

Note: For nested fields, use '.' as the nesting separtor. For example, if you wish to set the '/address/city' field to 'Seattle', express as shown:

    SetUpdateOperation<string> nestedPropertySetUpdate = new SetUpdateOperation<string>("address.city", "Seattle");

Configurable parameters

  • maxConcurrencyPerPartitionKeyRange : The maximum degree of concurrency per partition key range, setting to null will cause library to use default value of 20.
  • maxInMemorySortingBatchSize : The maximum number of update items pulled from the update items enumerator passed to the API call in each stage for in-memory pre-processing sorting phase prior to bulk updating, setting to null will cause library to use default value of min(updateItems.count, 1000000).
  • cancellationToken : The cancellation token to gracefully exit bulk update.

Bulk update response object definition

The result of the bulk update API call contains the following attributes:

  • NumberOfDocumentsUpdated (long) : The total number of documents which were successfully updated out of the ones supplied to the bulk update API call.
  • TotalRequestUnitsConsumed (double) : The total request units (RU) consumed by the bulk update API call.
  • TotalTimeTaken (TimeSpan) : The total time taken by the bulk update API call to complete execution.

Getting started with bulk update

  • Initialize DocumentClient set to Direct TCP connection mode
ConnectionPolicy connectionPolicy = new ConnectionPolicy
{
    ConnectionMode = ConnectionMode.Direct,
    ConnectionProtocol = Protocol.Tcp
};
DocumentClient client = new DocumentClient(
    new Uri(endpointUrl),
    authorizationKey,
    connectionPolicy)
  • Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();

// Set retries to 0 to pass complete control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  • Define the update items along with corresponding field update operations
SetUpdateOperation<string> nameUpdate = new SetUpdateOperation<string>("Name", "UpdatedDoc");
UnsetUpdateOperation descriptionUpdate = new UnsetUpdateOperation("description");

List<UpdateOperation> updateOperations = new List<UpdateOperation>();
updateOperations.Add(nameUpdate);
updateOperations.Add(descriptionUpdate);

List<UpdateItem> updateItems = new List<UpdateItem>();
for (int i = 0; i < 10; i++)
{
    updateItems.Add(new UpdateItem(i.ToString(), i.ToString(), updateOperations));
}
  • Call BulkUpdateAsync API
BulkUpdateResponse bulkUpdateResponse = await bulkExecutor.BulkUpdateAsync(
    updateItems: updateItems,
    maxConcurrencyPerPartitionKeyRange: null,
    maxInMemorySortingBatchSize: null,
    cancellationToken: token);

You can find the complete sample application program consuming the bulk update API here. You can configure the application settings in appSettings here.

In the sample application, we first bulk import documents and then bulk update all the imported documents to set the Name field to a new value and unset the description field in each document.

You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.

Performance of bulk update sample

When the given sample application is run on a standard DS16 v3 Azure VM in East US against a Cosmos DB collection in East US with 1 million RU/s allocated throughput - with NumberOfDocumentsToUpdate set to 25 million and NumberOfBatches set to 25 (in App.config) and default parameters for the bulk update API (as well as bulk import API), we observe the following performance for bulk update:

Updated 25000000 docs @ 53796 update/s, 491681 RU/s in 464.7 sec

API implementation details

The bulk update API is designed similar to bulk import - look at the implementation details of bulk import API for details.


Bulk Delete API

The bulk delete API accepts a list of <partitionKey, documentId> tuples to delete in bulk.

    Task<BulkDeleteResponse> BulkDeleteAsync(
        List<Tuple<string, string>> pkIdTuplesToDelete,
        int? deleteBatchSize = null,
        CancellationToken cancellationToken = default(CancellationToken));

Configurable parameters

  • deleteBatchSize : The maximum number delete items to execute transactionally, setting to null will cause library to use default value of 1000.
  • cancellationToken : The cancellation token to gracefully exit bulk delete.

Bulk delete response object definition

The result of the bulk delete API call contains the following attributes:

  • NumberOfDocumentsDeleted (long) : The total number of documents which were successfully deleted out of the ones supplied to the bulk delete API call.
  • TotalRequestUnitsConsumed (double) : The total request units (RU) consumed by the bulk delete API call.
  • TotalTimeTaken (TimeSpan) : The total time taken by the bulk delete API call to complete execution.

Getting started with bulk delete

  • Initialize DocumentClient set to Direct TCP connection mode
ConnectionPolicy connectionPolicy = new ConnectionPolicy
{
    ConnectionMode = ConnectionMode.Direct,
    ConnectionProtocol = Protocol.Tcp
};
DocumentClient client = new DocumentClient(
    new Uri(endpointUrl),
    authorizationKey,
    connectionPolicy)
  • Initialize BulkExecutor with high retry option values for the client SDK and then set to 0 to pass congestion control to BulkExector for its lifetime
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;

BulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();

// Set retries to 0 to pass complete control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
  • Define the list of <PartitionKey, DocumentId> tuples to delete
List<Tuple<string, string>> pkIdTuplesToDelete = new List<Tuple<string, string>>();
for(int i=0; i < NumberOfDocumentsToDelete; i++)
{
    pkIdTuplesToDelete.Add(new Tuple<string, string>(i.ToString(), i.ToString()));
}
  • Call BulkDeleteAsync API
BulkDeleteResponse bulkDeleteResponse = await bulkExecutor.BulkDeleteAsync(pkIdTuplesToDelete);

You can find the complete sample application program consuming the bulk delete API here. You can configure the application settings in appSettings here.

In the sample application, we first bulk import documents and then bulk delete a portion of the imported documents.

You can download the Microsoft.Azure.CosmosDB.BulkExecutor nuget package from here.


Performance tips

  • For best performance, run your application from an Azure VM in the same region as your Cosmos DB account write region.
  • It is advised to instantiate a single BulkExecutor object for the entirety of the application within a single VM corresponding to a specific Cosmos DB collection.
  • Since a single bulk operation API execution consumes a large chunk of the client machine's CPU and network IO by spawning multiple tasks internally, avoid spawning multiple concurrent tasks within your application process each executing bulk operation API calls. If a single bulk operation API call running on a single VM is unable to consume your entire collection's throughput (if your collection's throughput > 1 million RU/s), preferably spin up separate VMs to concurrently execute bulk operation API calls.
  • Ensure InitializeAsync() is invoked after instantiating a BulkExecutor object to fetch the target Cosmos DB collection partition map.
  • In your application's App.Config, ensure gcServer is enabled for better performance
  <runtime>
    <gcServer enabled="true" />
  </runtime>
  • The library emits traces which can be collected either into a log file or on the console. To enable both, add the following to your application's App.Config.
  <system.diagnostics>
    <trace autoflush="false" indentsize="4">
      <listeners>
        <add name="logListener" type="System.Diagnostics.TextWriterTraceListener" initializeData="application.log" />
        <add name="consoleListener" type="System.Diagnostics.ConsoleTraceListener" />
      </listeners>
    </trace>
  </system.diagnostics>

Contributing & feedback

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

See CONTRIBUTING.md for contribution guidelines.

To give feedback and/or report an issue, open a GitHub Issue.


Other relevant projects