Skip to content

Commit

Permalink
Moved cosmos job hosting to use unified retry logic (#4773)
Browse files Browse the repository at this point in the history
* Moved cosmos job hosting to use unified retry logic

* Improve exception policy per cosmos recomendations

* remove unneeded using

* fix reindex worker context

* fix code scanning issue

* handle case of null diagnostics

* Add retry unit tests

* formatting fixes

* remove unneeded usings

* updated retry policy

* cleanup

* add logging to new tests

* relaxed retry test timing due to stopwatch impercision

* removed primary constructor in CosmosQueueClientTests child test class

* advanced base, non default delay
  • Loading branch information
mikaelweave authored Jan 15, 2025
1 parent bc6b453 commit c8f678c
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
Expand Down Expand Up @@ -59,6 +61,7 @@ public ReindexJobWorkerTests()
Options.Create(_reindexJobConfiguration),
_reindexJobTask.CreateMockScopeProvider(),
searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

_reindexJobWorker.Handle(new Messages.Search.SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Core.Features.Search.Parameters;
using Microsoft.Health.Fhir.Core.Messages.Search;
Expand All @@ -29,6 +32,7 @@ public class ReindexJobWorker : INotificationHandler<SearchParametersInitialized
private readonly ReindexJobConfiguration _reindexJobConfiguration;
private readonly IScopeProvider<IReindexJobTask> _reindexJobTaskFactory;
private readonly ISearchParameterOperations _searchParameterOperations;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly ILogger _logger;
private bool _searchParametersInitialized = false;

Expand All @@ -37,19 +41,15 @@ public ReindexJobWorker(
IOptions<ReindexJobConfiguration> reindexJobConfiguration,
IScopeProvider<IReindexJobTask> reindexJobTaskFactory,
ISearchParameterOperations searchParameterOperations,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
ILogger<ReindexJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
EnsureArg.IsNotNull(logger, nameof(logger));

_fhirOperationDataStoreFactory = fhirOperationDataStoreFactory;
_reindexJobConfiguration = reindexJobConfiguration.Value;
_reindexJobTaskFactory = reindexJobTaskFactory;
_searchParameterOperations = searchParameterOperations;
_logger = logger;
_fhirOperationDataStoreFactory = EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
_reindexJobConfiguration = EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
_reindexJobTaskFactory = EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
_searchParameterOperations = EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
_contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
}

public async Task ExecuteAsync(CancellationToken cancellationToken)
Expand All @@ -60,6 +60,22 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
{
if (_searchParametersInitialized)
{
var originalRequestContext = _contextAccessor.RequestContext;

// Create a background task context to trigger the correct retry policy.
var fhirRequestContext = new FhirRequestContext(
method: nameof(ReindexJobWorker),
uriString: nameof(ReindexJobWorker),
baseUriString: nameof(ReindexJobWorker),
correlationId: Guid.NewGuid().ToString(),
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};

_contextAccessor.RequestContext = fhirRequestContext;

// Check for any changes to Search Parameters
try
{
Expand Down Expand Up @@ -124,6 +140,8 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
// The job failed.
_logger.LogError(ex, "Error polling Reindex jobs.");
}

_contextAccessor.RequestContext = originalRequestContext;
}

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CosmosFhirDataStoreTests()
_cosmosDataStoreConfiguration,
Substitute.For<IOptionsMonitor<CosmosCollectionConfiguration>>(),
_cosmosQueryFactory,
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, requestContextAccessor),
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, requestContextAccessor, NullLogger<RetryExceptionPolicyFactory>.Instance),
NullLogger<CosmosFhirDataStore>.Instance,
Options.Create(new CoreFeatureConfiguration()),
_bundleOrchestrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public FhirCosmosClientInitializerTests()
_initializer = new FhirCosmosClientInitializer(
clientTestProvider,
() => new[] { new TestRequestHandler() },
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, Substitute.For<RequestContextAccessor<IFhirRequestContext>>()),
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, Substitute.For<RequestContextAccessor<IFhirRequestContext>>(), NullLogger<RetryExceptionPolicyFactory>.Instance),
Substitute.For<CosmosAccessTokenProviderFactory>(),
NullLogger<FhirCosmosClientInitializer>.Instance);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.CosmosDb.Core.Configs;
using Microsoft.Health.Fhir.CosmosDb.Core.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Queries;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Test.Utilities;
using NSubstitute;
using Xunit;

namespace Microsoft.Health.Fhir.CosmosDb.UnitTests.Features.Storage.Queues;

[Trait(Traits.OwningTeam, OwningTeam.Fhir)]
[Trait(Traits.Category, Categories.DataSourceValidation)]
public class CosmosQueueClientTests
{
private readonly ICosmosQueryFactory _cosmosQueryFactory;
private readonly ICosmosDbDistributedLockFactory _distributedLockFactory;
private readonly CosmosDataStoreConfiguration _cosmosDataStoreConfiguration = new CosmosDataStoreConfiguration();
private readonly RequestContextAccessor<IFhirRequestContext> _requestContextAccessor;
private readonly RetryExceptionPolicyFactory _retryPolicyFactory;
private readonly CosmosQueueClient _cosmosQueueClient;

public CosmosQueueClientTests()
{
_cosmosQueryFactory = Substitute.For<ICosmosQueryFactory>();
_distributedLockFactory = Substitute.For<ICosmosDbDistributedLockFactory>();
_requestContextAccessor = Substitute.For<RequestContextAccessor<IFhirRequestContext>>();
_retryPolicyFactory = new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, _requestContextAccessor, NullLogger<RetryExceptionPolicyFactory>.Instance);

_cosmosQueueClient = new CosmosQueueClient(
Substitute.For<Func<IScoped<Container>>>(),
_cosmosQueryFactory,
_distributedLockFactory,
_retryPolicyFactory);
}

[Theory]
[InlineData(HttpStatusCode.ServiceUnavailable)]
[InlineData(HttpStatusCode.TooManyRequests)]
[InlineData(HttpStatusCode.Gone)]
[InlineData((HttpStatusCode)449)]
[InlineData(HttpStatusCode.RequestTimeout)]
public async Task GivenADequeueJobOperation_WhenExceptionOccurs_RetryWillHappen(HttpStatusCode statusCode)
{
// Arrange
ICosmosQuery<JobGroupWrapper> cosmosQuery = Substitute.For<ICosmosQuery<JobGroupWrapper>>();
_cosmosQueryFactory.Create<JobGroupWrapper>(Arg.Any<Container>(), Arg.Any<CosmosQueryContext>())
.ReturnsForAnyArgs(cosmosQuery);

int callCount = 0;
cosmosQuery.ExecuteNextAsync(Arg.Any<CancellationToken>()).ReturnsForAnyArgs(_ =>
{
if (callCount++ == 0)
{
throw new TestCosmosException(statusCode);
}

return Task.FromResult(Substitute.For<FeedResponse<JobGroupWrapper>>());
});

// Act
await _cosmosQueueClient.DequeueAsync(0, "testworker", 10, CancellationToken.None);

// Assert
Assert.Equal(2, callCount);
await cosmosQuery.ReceivedWithAnyArgs(2).ExecuteNextAsync(Arg.Any<CancellationToken>());
}

[Theory]
[InlineData(typeof(CosmosException))]
[InlineData(typeof(RequestRateExceededException))]
public async Task GivenADequeueJobOperation_WhenExceptionWithRetryAfterIsProvided_PolicyRespectsRetryAfter(Type exceptionType)
{
// Arrange
ICosmosQuery<JobGroupWrapper> cosmosQuery = Substitute.For<ICosmosQuery<JobGroupWrapper>>();
_cosmosQueryFactory.Create<JobGroupWrapper>(Arg.Any<Container>(), Arg.Any<CosmosQueryContext>())
.ReturnsForAnyArgs(cosmosQuery);
var retryAfter = TimeSpan.FromSeconds(2);
int callCount = 0;

cosmosQuery.ExecuteNextAsync(Arg.Any<CancellationToken>()).ReturnsForAnyArgs(_ =>
{
if (callCount++ == 0)
{
throw exceptionType == typeof(CosmosException)
? new TestCosmosException(HttpStatusCode.TooManyRequests, retryAfter)
: new RequestRateExceededException(retryAfter);
}

return Task.FromResult(Substitute.For<FeedResponse<JobGroupWrapper>>());
});

var stopwatch = Stopwatch.StartNew();

// Act
await _cosmosQueueClient.DequeueAsync(0, "testworker", 10, CancellationToken.None);

stopwatch.Stop();

// Assert
Assert.Equal(2, callCount);
await cosmosQuery.ReceivedWithAnyArgs(2).ExecuteNextAsync(Arg.Any<CancellationToken>());

// Allowing small imprecision due to timer resolution
var actualElapsedSeconds = stopwatch.Elapsed.TotalSeconds;
Assert.True(
Math.Abs(actualElapsedSeconds - retryAfter.TotalSeconds) <= 0.5,
$"Expected retry after {retryAfter.TotalSeconds} seconds, but actual elapsed time was {actualElapsedSeconds} seconds.");
}

public class TestCosmosException : CosmosException
{
private readonly TimeSpan? _retryAfter;

public TestCosmosException(HttpStatusCode statusCode, TimeSpan? retryAfter = null)
: base("Test exception message", statusCode, 0, "test-activity-id", 0.0)
{
_retryAfter = retryAfter;
}

public override TimeSpan? RetryAfter => _retryAfter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core;
using Microsoft.Health.Core.Extensions;
Expand All @@ -29,20 +30,21 @@ public class CosmosQueueClient : IQueueClient
private readonly Func<IScoped<Container>> _containerFactory;
private readonly ICosmosQueryFactory _queryFactory;
private readonly ICosmosDbDistributedLockFactory _distributedLockFactory;
private static readonly AsyncPolicy _retryPolicy = Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.PreconditionFailed)
.Or<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.Or<RequestRateExceededException>()
.WaitAndRetryAsync(5, _ => TimeSpan.FromMilliseconds(RandomNumberGenerator.GetInt32(100, 1000)));
private readonly RetryExceptionPolicyFactory _retryExceptionPolicyFactory;
private readonly AsyncPolicy _retryPolicy;

public CosmosQueueClient(
Func<IScoped<Container>> containerFactory,
ICosmosQueryFactory queryFactory,
ICosmosDbDistributedLockFactory distributedLockFactory)
ICosmosDbDistributedLockFactory distributedLockFactory,
RetryExceptionPolicyFactory retryExceptionPolicyFactory)
{
_containerFactory = EnsureArg.IsNotNull(containerFactory, nameof(containerFactory));
_queryFactory = EnsureArg.IsNotNull(queryFactory, nameof(queryFactory));
_distributedLockFactory = EnsureArg.IsNotNull(distributedLockFactory, nameof(distributedLockFactory));
_retryExceptionPolicyFactory = EnsureArg.IsNotNull(retryExceptionPolicyFactory, nameof(retryExceptionPolicyFactory));

_retryPolicy = _retryExceptionPolicyFactory.BackgroundWorkerRetryPolicy;
}

public bool IsInitialized() => true;
Expand Down
Loading

0 comments on commit c8f678c

Please # to comment.