-
If more physical partitions are added while running parallel processing with multiple workers using I wrote the following code. First, if there are no continuation tokens, it gets FeedRanges and processes them in parallel until Change Feed is empty. The processes executed in parallel will return the continuation token when the Change Feed becomes empty, and save it for the next execution. If there are continuation tokens, parallel processing will be performed for the number of continuation tokens. class Program
{
static async Task Main(string[] args)
{
var connectionString = "AccountEndpoint=https://***.documents.azure.com:443/;AccountKey=***;";
var cosmosClient = new CosmosClient(connectionString);
var container = cosmosClient.GetContainer("HackAzure", "TodoItem");
// Load existing continuation tokens
var continuationTokens = await ReadContinuationTokens();
IEnumerable<Task<string>> tasks;
if (continuationTokens == null)
{
// Start new change feed iterator
// Has 4 physical partitions = 4 feed range
var feedRanges = await container.GetFeedRangesAsync();
tasks = feedRanges.Select(feedRange => ProcessFeedAsync<TodoItem>(container, ChangeFeedStartFrom.Beginning(feedRange)));
}
else
{
// Resume change feed iterator
tasks = continuationTokens.Select(continuationToken => ProcessFeedAsync<TodoItem>(container, ChangeFeedStartFrom.ContinuationToken(continuationToken)));
}
// Save new continuation tokens
var newContinuationTokens = await Task.WhenAll(tasks);
await SaveContinuationTokens(newContinuationTokens);
}
private static async Task<string> ProcessFeedAsync<T>(Container container, ChangeFeedStartFrom changeFeedStartFrom)
{
var iterator = container.GetChangeFeedIterator<T>(changeFeedStartFrom, new ChangeFeedRequestOptions
{
PageSizeHint = 100
});
string continuationToken = null;
while (iterator.HasMoreResults)
{
try
{
var items = await iterator.ReadNextAsync();
continuationToken = items.ContinuationToken;
foreach (var item in items)
{
Console.WriteLine(item.ToString());
}
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
continuationToken = ex.Headers.ContinuationToken;
break;
}
}
return continuationToken;
}
} I believe that this implementation will work fine if the physical partitions do not increase, but I do not understand how it will work if the physical partitions increase during the process. My understanding is that the FeedRange corresponds to a single physical partition, so if a physical partition is added, the FeedRange will need to be redefined? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
This is tracking the public API to try to scale out the FeedRange: #1680 |
Beta Was this translation helpful? Give feedback.
This is tracking the public API to try to scale out the FeedRange: #1680