botframework-sdk: [DCR/Bug] Cosmos PartitionKeys are semi-broken

Relevant customer issues:

Issue

This issue applies to both/all SDKs and is as much of a design problem as it is a bug. Basically, if a user defines a PartitionKey in CosmosDbStorageOptions, the majority of bots will not be able to read/delete/update state.

The reason for this requires substantial explanation (again, this applies to all/both repos). I’ll try my best to keep it brief:

The Way Cosmos Works

Containers (previously called Collections) are created with a PartitionKeyPath, which might be something like /data/city.

Let’s say we have an item:

const item = {
    id: '123',
    data: {
        city: 'Bellevue'
    }
}

When an item is added to the collection, Cosmos decides which physical partition to put it on by the value of item.data.city. So, all items with item.data.city === "Bellevue", get put on the same partition.

Writes to the DB don’t need partitionKey specified because it’s already defined 1) on the container, and 2) on the item.

Reads and Deletes, however, benefit GREATLY from defining the partitionKey. If we say READ item 123 with partitionKey Bellevue, it will only spin up the physical drive with the Bellevue items when it searches for item 123. This is especially true when querying for multiple items with the same partitionKey since they’ll all be on the same partition, by design.

How our SDK works

We pass in our partitionKey through cosmosDbStorageOptions.

When writing, we do nothing with the partitionKey. ✔

When reading, we attempt to read with the previously-set partitionKey 🚫

When deleting, we attempt to delete with the previously-set partitionKey 🚫

Reading and deleting isn’t implemented properly in the Bot Framework because, to use the previous example, we’re either:

  1. Setting partitionKey to "/data/city": when we READ or DELETE, we’re trying to query with "/data/city" instead of "Bellevue", or

  2. Setting partitionKey to "Bellevue": when we READ or DELETE, we can never find any items with city.data === "Seattle"

Why This Wasn’t Caught In Tests

Easy mistake and I’d imagine it comes from the confusion around partitionKeys and partitionKeyPaths. They’re confusing and it’s taken me a decent amount of digging to wrap my head around this.

The tests that deal with partitions set the partitionKeyPath, then define a single item with the “right” key value and then look for that specific item, with its known key value of “Contoso” instead of the partitionKeyPath we used earlier of /city/location, which is how our SDK would actually work (or we’d have multiple items with different cities and only be able to pull up the “Contoso” ones).

Temporary Workaround

If you’re using Cosmos and running into this issue, you can “fix” this by:

  1. Deleting your Container
  2. Create a new Container with partitionKeyPath set to /_partitionKey
  3. Remove the PartitionKey property from CosmosDbStorageOptions

Update:

I believe I’ve found a better temporary workaround that is easier to use and uses partitioning:

  1. Delete your Container
  2. Create a new Container with partitionKeyPath set to /id
  3. Remove the PartitionKey property from CosmosDbStorageOptions

Temporary Workaround With Partitioning

If your data is likely to go over 10GB, you may run into an issue using the above workaround because I believe Cosmos will put every document onto the same partition and each partition is limited to 10GB of data. To actually use partitioning, you can take the following steps (this is written for C#, but the concepts apply to Node):

  1. Download the SDK and add the downloaded/local Microsoft.Bot.Builder.Azure .csproj to your bot (this will allow you to make changes to the SDK).
  • Note: You’ll likely need to adjust the dependencies for Microsoft.Bot.Builder.Azure. I think I just deleted Microsoft.Bot.Builder from Microsoft.Bot.Builder.Azure > Dependencies > Project
  1. In your bot’s CosmosDbOptions, use PartitionKey = "realId".
  2. Change CosmosDbStorage.cs to:
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Serialization;

namespace Microsoft.Bot.Builder.Azure
{
    /// <summary>
    /// Implements an CosmosDB based storage provider for a bot.
    /// </summary>
    public class CosmosDbStorage : IStorage
    {
        // When setting up the database, calls are made to CosmosDB. If multiple calls are made, we'll end up setting the
        // collectionLink member variable more than once. The semaphore is for making sure the initialization of the
        // database is done only once.
        private static SemaphoreSlim _semaphore = new SemaphoreSlim(1);

        private readonly JsonSerializer _jsonSerializer = JsonSerializer.Create(new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });

        private readonly string _databaseId;
        private readonly string _partitionKey;
        private readonly string _collectionId;
        private readonly RequestOptions _documentCollectionCreationRequestOptions = null;
        private readonly RequestOptions _databaseCreationRequestOptions = null;
        private readonly IDocumentClient _client;
        private string _collectionLink = null;

        /// <summary>
        /// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
        /// using the provided CosmosDB credentials, database ID, and collection ID.
        /// </summary>
        /// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
        public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions)
        {
            if (cosmosDbStorageOptions == null)
            {
                throw new ArgumentNullException(nameof(cosmosDbStorageOptions));
            }

            if (cosmosDbStorageOptions.CosmosDBEndpoint == null)
            {
                throw new ArgumentNullException(nameof(cosmosDbStorageOptions.CosmosDBEndpoint), "Service EndPoint for CosmosDB is required.");
            }

            if (string.IsNullOrEmpty(cosmosDbStorageOptions.AuthKey))
            {
                throw new ArgumentException("AuthKey for CosmosDB is required.", nameof(cosmosDbStorageOptions.AuthKey));
            }

            if (string.IsNullOrEmpty(cosmosDbStorageOptions.DatabaseId))
            {
                throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbStorageOptions.DatabaseId));
            }

            if (string.IsNullOrEmpty(cosmosDbStorageOptions.CollectionId))
            {
                throw new ArgumentException("CollectionId is required.", nameof(cosmosDbStorageOptions.CollectionId));
            }

            _databaseId = cosmosDbStorageOptions.DatabaseId;
            _collectionId = cosmosDbStorageOptions.CollectionId;
            _partitionKey = cosmosDbStorageOptions.PartitionKey;
            _documentCollectionCreationRequestOptions = cosmosDbStorageOptions.DocumentCollectionRequestOptions;
            _databaseCreationRequestOptions = cosmosDbStorageOptions.DatabaseCreationRequestOptions;

            // Inject BotBuilder version to CosmosDB Requests
            var version = GetType().Assembly.GetName().Version;
            var connectionPolicy = new ConnectionPolicy { UserAgentSuffix = $"Microsoft-BotFramework {version}" };

            // Invoke CollectionPolicy delegate to further customize settings
            cosmosDbStorageOptions.ConnectionPolicyConfigurator?.Invoke(connectionPolicy);
            _client = new DocumentClient(cosmosDbStorageOptions.CosmosDBEndpoint, cosmosDbStorageOptions.AuthKey, connectionPolicy);
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
        /// using the provided CosmosDB credentials, database ID, and collection ID.
        /// </summary>
        /// <param name="cosmosDbStorageOptions">Cosmos DB storage configuration options.</param>
        /// <param name="jsonSerializer">If passing in a custom JsonSerializer, we recommend the following settings:
        /// <para>jsonSerializer.TypeNameHandling = TypeNameHandling.All.</para>
        /// <para>jsonSerializer.NullValueHandling = NullValueHandling.Include.</para>
        /// <para>jsonSerializer.ContractResolver = new DefaultContractResolver().</para>
        /// </param>
        public CosmosDbStorage(CosmosDbStorageOptions cosmosDbStorageOptions, JsonSerializer jsonSerializer)
            : this(cosmosDbStorageOptions)
        {
            if (jsonSerializer == null)
            {
                throw new ArgumentNullException(nameof(jsonSerializer));
            }

            _jsonSerializer = jsonSerializer;
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="CosmosDbStorage"/> class.
        /// This constructor should only be used if the default behavior of the DocumentClient needs to be changed.
        /// The <see cref="CosmosDbStorage(CosmosDbStorageOptions)"/> constructor is preferer for most cases.
        /// </summary>
        /// <param name="documentClient">The custom implementation of IDocumentClient.</param>
        /// <param name="cosmosDbCustomClientOptions">Custom client configuration options.</param>
        public CosmosDbStorage(IDocumentClient documentClient, CosmosDbCustomClientOptions cosmosDbCustomClientOptions)
        {
            if (cosmosDbCustomClientOptions == null)
            {
                throw new ArgumentNullException(nameof(cosmosDbCustomClientOptions));
            }

            if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.DatabaseId))
            {
                throw new ArgumentException("DatabaseId is required.", nameof(cosmosDbCustomClientOptions.DatabaseId));
            }

            if (string.IsNullOrEmpty(cosmosDbCustomClientOptions.CollectionId))
            {
                throw new ArgumentException("CollectionId is required.", nameof(cosmosDbCustomClientOptions.CollectionId));
            }

            _client = documentClient ?? throw new ArgumentNullException(nameof(documentClient), "An implementation of IDocumentClient for CosmosDB is required.");
            _databaseId = cosmosDbCustomClientOptions.DatabaseId;
            _collectionId = cosmosDbCustomClientOptions.CollectionId;
            _documentCollectionCreationRequestOptions = cosmosDbCustomClientOptions.DocumentCollectionRequestOptions;
            _databaseCreationRequestOptions = cosmosDbCustomClientOptions.DatabaseCreationRequestOptions;

            // Inject BotBuilder version to CosmosDB Requests
            var version = GetType().Assembly.GetName().Version;
            _client.ConnectionPolicy.UserAgentSuffix = $"Microsoft-BotFramework {version}";
        }

        [Obsolete("Replaced by CosmosDBKeyEscape.EscapeKey.")]
        public static string SanitizeKey(string key) => CosmosDbKeyEscape.EscapeKey(key);

        /// <summary>
        /// Deletes storage items from storage.
        /// </summary>
        /// <param name="keys">keys of the <see cref="IStoreItem"/> objects to remove from the store.</param>
        /// <param name="cancellationToken">A cancellation token that can be used by other objects
        /// or threads to receive notice of cancellation.</param>
        /// <returns>A task that represents the work queued to execute.</returns>
        /// <seealso cref="ReadAsync(string[], CancellationToken)"/>
        /// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
        public async Task DeleteAsync(string[] keys, CancellationToken cancellationToken)
        {
            RequestOptions options = null;

            if (keys == null)
            {
                throw new ArgumentNullException(nameof(keys));
            }

            if (keys.Length == 0)
            {
                return;
            }

            // Ensure Initialization has been run
            await InitializeAsync().ConfigureAwait(false);

            // Parallelize deletion
            var tasks = keys.Select(key =>
                _client.DeleteDocumentAsync(
                    UriFactory.CreateDocumentUri(
                        _databaseId,
                        _collectionId,
                        CosmosDbKeyEscape.EscapeKey(key)),
                    new RequestOptions() { PartitionKey = new PartitionKey(key) },
                    cancellationToken: cancellationToken));

            // await to deletion tasks to complete
            await Task.WhenAll(tasks).ConfigureAwait(false);
        }

        /// <summary>
        /// Reads storage items from storage.
        /// </summary>
        /// <param name="keys">keys of the <see cref="IStoreItem"/> objects to read from the store.</param>
        /// <param name="cancellationToken">A cancellation token that can be used by other objects
        /// or threads to receive notice of cancellation.</param>
        /// <returns>A task that represents the work queued to execute.</returns>
        /// <remarks>If the activities are successfully sent, the task result contains
        /// the items read, indexed by key.</remarks>
        /// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
        /// <seealso cref="WriteAsync(IDictionary{string, object}, CancellationToken)"/>
        public async Task<IDictionary<string, object>> ReadAsync(string[] keys, CancellationToken cancellationToken)
        {
            FeedOptions options = null;

            if (keys == null)
            {
                throw new ArgumentNullException(nameof(keys));
            }

            if (keys.Length == 0)
            {
                // No keys passed in, no result to return.
                return new Dictionary<string, object>();
            }

            // Ensure Initialization has been run
            await InitializeAsync().ConfigureAwait(false);

            var storeItems = new Dictionary<string, object>(keys.Length);

            var parameterSequence = string.Join(",", Enumerable.Range(0, keys.Length).Select(i => $"@id{i}"));
            var parameterValues = keys.Select((key, ix) => new SqlParameter($"@id{ix}", CosmosDbKeyEscape.EscapeKey(key)));
            var querySpec = new SqlQuerySpec
            {
                QueryText = $"SELECT c.id, c.realId, c.document, c._etag FROM c WHERE c.id in ({parameterSequence})",
                Parameters = new SqlParameterCollection(parameterValues),
            };

            var query = _client.CreateDocumentQuery<DocumentStoreItem>(_collectionLink, querySpec, new FeedOptions() { PartitionKey = new PartitionKey(keys[0]) }).AsDocumentQuery();
            while (query.HasMoreResults)
            {
                foreach (var doc in await query.ExecuteNextAsync<DocumentStoreItem>(cancellationToken).ConfigureAwait(false))
                {
                    var item = doc.Document.ToObject(typeof(object), _jsonSerializer);
                    if (item is IStoreItem storeItem)
                    {
                        storeItem.ETag = doc.ETag;
                    }

                    // doc.Id cannot be used since it is escaped, read it from RealId property instead
                    storeItems.Add(doc.ReadlId, item);
                }
            }

            return storeItems;
        }

        /// <summary>
        /// Writes storage items to storage.
        /// </summary>
        /// <param name="changes">The items to write to storage, indexed by key.</param>
        /// <param name="cancellationToken">A cancellation token that can be used by other objects
        /// or threads to receive notice of cancellation.</param>
        /// <returns>A task that represents the work queued to execute.</returns>
        /// <seealso cref="DeleteAsync(string[], CancellationToken)"/>
        /// <seealso cref="ReadAsync(string[], CancellationToken)"/>
        public async Task WriteAsync(IDictionary<string, object> changes, CancellationToken cancellationToken)
        {
            if (changes == null)
            {
                throw new ArgumentNullException(nameof(changes));
            }

            if (changes.Count == 0)
            {
                return;
            }

            // Ensure Initialization has been run
            await InitializeAsync().ConfigureAwait(false);

            foreach (var change in changes)
            {
                var json = JObject.FromObject(change.Value, _jsonSerializer);

                // Remove etag from JSON object that was copied from IStoreItem.
                // The ETag information is updated as an _etag attribute in the document metadata.
                json.Remove("eTag");

                var documentChange = new DocumentStoreItem
                {
                    Id = CosmosDbKeyEscape.EscapeKey(change.Key),
                    ReadlId = change.Key,
                    Document = json,
                };

                var etag = (change.Value as IStoreItem)?.ETag;
                if (etag == null || etag == "*")
                {
                    // if new item or * then insert or replace unconditionaly
                    await _client.UpsertDocumentAsync(
                        _collectionLink,
                        documentChange,
                        disableAutomaticIdGeneration: true,
                        cancellationToken: cancellationToken).ConfigureAwait(false);
                }
                else if (etag.Length > 0)
                {
                    // if we have an etag, do opt. concurrency replace
                    var uri = UriFactory.CreateDocumentUri(_databaseId, _collectionId, documentChange.Id);
                    var ac = new AccessCondition { Condition = etag, Type = AccessConditionType.IfMatch };
                    await _client.ReplaceDocumentAsync(
                        uri,
                        documentChange,
                        new RequestOptions { AccessCondition = ac },
                        cancellationToken: cancellationToken).ConfigureAwait(false);
                }
                else
                {
                    throw new Exception("etag empty");
                }
            }
        }

        /// <summary>
        /// Creates the CosmosDB Database and populates the _collectionLink member variable.
        /// </summary>
        /// <remarks>
        /// This method is idempotent, and thread safe.
        /// </remarks>
        private async Task InitializeAsync()
        {
            // In the steady-state case, we'll already have a connection string to the
            // database setup. If so, no need to enter locks or call CosmosDB to get one.
            if (_collectionLink == null)
            {
                // We don't (probably) have a database link yet. Enter the lock,
                // then check again (aka: Double-Check Lock pattern).
                await _semaphore.WaitAsync().ConfigureAwait(false);
                try
                {
                    if (_collectionLink == null)
                    {
                        // We don't have a database link. Create one. Note that we're inside a semaphore at this point
                        // so other threads may be blocked on us.
                        await _client.CreateDatabaseIfNotExistsAsync(
                            new Database { Id = _databaseId },
                            _databaseCreationRequestOptions) // pass in any user set database creation flags
                            .ConfigureAwait(false);

                        var documentCollection = new DocumentCollection
                        {
                            Id = _collectionId,
                        };

                        var response = await _client.CreateDocumentCollectionIfNotExistsAsync(
                            UriFactory.CreateDatabaseUri(_databaseId),
                            documentCollection,
                            _documentCollectionCreationRequestOptions) // pass in any user set collection creation flags
                            .ConfigureAwait(false);

                        _collectionLink = response.Resource.SelfLink;
                    }
                }
                finally
                {
                    _semaphore.Release();
                }
            }
        }

        /// <summary>
        /// Internal data structure for storing items in a CosmosDB Collection.
        /// </summary>
        private class DocumentStoreItem
        {
            /// <summary>
            /// Gets or sets the sanitized Id/Key used as PrimaryKey.
            /// </summary>
            [JsonProperty("id")]
            public string Id { get; set; }

            /// <summary>
            /// Gets or sets the un-sanitized Id/Key.
            /// </summary>
            /// <remarks>
            /// Note: There is a Typo in the property name ("ReadlId"), that can't be changed due to compatability concerns. The
            /// Json is correct due to the JsonProperty field, but the Typo needs to stay.
            /// </remarks>
            [JsonProperty("realId")]
            public string ReadlId { get; internal set; }

            // DO NOT FIX THE TYPO BELOW (See Remarks above).

            /// <summary>
            /// Gets or sets the persisted object.
            /// </summary>
            [JsonProperty("document")]
            public JObject Document { get; set; }

            /// <summary>
            /// Gets or sets the ETag information for handling optimistic concurrency updates.
            /// </summary>
            [JsonProperty("_etag")]
            public string ETag { get; set; }
        }
    }
}

Here’s the diff

Note: This should still be considered a workaround and not an officially-supported change

Proposed change

The problem with coming up with a good solution is that currently, our READ and DELETE operations both only accept key (which isn’t anything more than an id), so there isn’t a good way to intuit the appropriate partitionKey.

The best solution that I can think of has two parts:

  1. If a developer wants to use CosmosDbStorage for non-BotState things, allow them to use partitionKeys however they want. They know best.
  2. If a developer wants to use CosmosDbStorage for BotState things, disallow custom partitionKeys. Instead, since key is already passed in, we use /realId as the partitionKeyPath (key == realId) and then we can use key/realId as the `partitionKey for READ/DELETE. This goes against the conventional thought behind partitioning, but actually works well for Cosmos
  • Alternatively, we could partition on the ConversationId. There’s no use of this currently in our SDK, but if we were ever going to query multiple documents at once, it would likely be within a ConversationId.

That being said, I don’t love this solution and hope we can come up with something better.

Component Impact

  • CosmosDbStorage
  • Possibly CosmosDbStorageOptions
  • Possibly IStorage
  • Possibly some state classes

Customer Impact

Would allow for actual partitioning and prevent users from running into errors when trying to use partitioning with BotState.

Tracking Status

Dotnet SDK

  • PR
  • Merged

Javascript SDK

  • PR
  • Merged

Java SDK

  • PR
  • Merged

Python SDK

  • PR
  • Merged

Emulator

  • PR
  • Merged

Samples

  • PR
  • Merged

Docs

  • PR
  • Merged

Tools

  • PR
  • Merged

[dcr]

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 5
  • Comments: 15 (10 by maintainers)

Most upvoted comments

@mdrichardson I ended up following the pattern for the third solution and just adopted the c# modifications to the typescript file. We’re using the delete for when a user has a privacy request to remove their information from our system, we scrub the transcript and state data for any of their conversations.

@mdrichardson agree that a second class is an option - in fact I had written that up as a second option and then deleted it, because I would love to avoid it if possible. I think having the flag should be pretty clean and a small iteration on the above, at least right now if we wanted to get a fix in using v2. We should just need to check the flag when applying the PartitionKey to the queries.

Totally understand what you are saying about moving to v3 if we are going to update the dependency, but upgrading to later in v2 will require much less work and will at least allow us to be confident in getting a fix for Cosmos into 4.6.