diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs index db31061842..93ad64c9a1 100644 --- a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs @@ -385,35 +385,32 @@ public async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancellationOnFa (JobGroupWrapper JobGroup, IReadOnlyList MatchingJob) definitionTuple = jobs.Single(); JobDefinitionWrapper item = definitionTuple.MatchingJob.Single(); - if (item != null) + if (jobInfo.Status == JobStatus.Failed) { - if (jobInfo.Status == JobStatus.Failed) + // If a job fails with requestCancellationOnFailure, all other jobs in the group should be cancelled. + if (requestCancellationOnFailure) { - // If a job fails with requestCancellationOnFailure, all other jobs in the group should be cancelled. - if (requestCancellationOnFailure) + foreach (JobDefinitionWrapper jobDefinition in definitionTuple.JobGroup.Definitions) { - foreach (JobDefinitionWrapper jobDefinition in definitionTuple.JobGroup.Definitions) - { - CancelJobDefinition(jobDefinition); - } + CancelJobDefinition(jobDefinition); } - - item.Status = (byte)JobStatus.Failed; - } - else if (item.CancelRequested) - { - item.Status = (byte)JobStatus.Cancelled; - } - else - { - item.Status = (byte?)jobInfo.Status ?? (byte?)JobStatus.Completed; } - item.EndDate = Clock.UtcNow; - item.Result = jobInfo.Result; - - await SaveJobGroupAsync(definitionTuple.JobGroup, cancellationToken); + item.Status = (byte)JobStatus.Failed; + } + else if (item.CancelRequested) + { + item.Status = (byte)JobStatus.Cancelled; + } + else + { + item.Status = (byte?)JobStatus.Completed; } + + item.EndDate = Clock.UtcNow; + item.Result = jobInfo.Result; + + await SaveJobGroupAsync(definitionTuple.JobGroup, cancellationToken); } private async Task> GetGroupInternalAsync( diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs index 153f1164f8..aa89beee67 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs @@ -8,11 +8,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; using Microsoft.Health.Extensions.Xunit; -using Microsoft.Health.Fhir.Core.UnitTests.Extensions; -using Microsoft.Health.Fhir.SqlServer.Features.Schema; -using Microsoft.Health.Fhir.SqlServer.Features.Storage; using Microsoft.Health.Fhir.Tests.Common; using Microsoft.Health.Fhir.Tests.Common.FixtureParameters; using Microsoft.Health.JobManagement; @@ -305,20 +301,19 @@ await this.RetryAsync( async () => { var queueType = (byte)TestQueueType.ExecuteWithHeartbeat; - var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, XUnitLogger.Create(_testOutputHelper)); - await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); - JobInfo job = await client.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); + JobInfo job = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); var cancel = new CancellationTokenSource(); cancel.CancelAfter(TimeSpan.FromSeconds(30)); - var execTask = JobHosting.ExecuteJobWithHeartbeatsAsync( - client, + Task execTask = JobHosting.ExecuteJobWithHeartbeatsAsync( + _queueClient, queueType, job.Id, job.Version, async cancelSource => { - await Task.Delay(TimeSpan.FromSeconds(10)); - await client.CompleteJobAsync(job, false, cancel.Token); + await Task.Delay(TimeSpan.FromSeconds(10), cancelSource.Token); + await _queueClient.CompleteJobAsync(job, false, cancelSource.Token); return "Test"; }, TimeSpan.FromSeconds(1), @@ -332,8 +327,8 @@ await this.RetryAsync( { while (currentJob.Status == JobStatus.Running) { - await Task.Delay(TimeSpan.FromSeconds(1)); - currentJob = await client.GetJobByIdAsync(queueType, job.Id, true, cancel.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cancel.Token); + currentJob = await _queueClient.GetJobByIdAsync(queueType, job.Id, true, cancel.Token); if (currentJob.HeartbeatDateTime != previousJob.HeartbeatDateTime) { heartbeatChanges++; @@ -355,20 +350,19 @@ await this.RetryAsync( async () => { var queueType = (byte)TestQueueType.ExecuteWithHeartbeatsHeavy; - var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, XUnitLogger.Create(_testOutputHelper)); - await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); - JobInfo job = await client.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); + JobInfo job = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); var cancel = new CancellationTokenSource(); cancel.CancelAfter(TimeSpan.FromSeconds(30)); var execTask = JobHosting.ExecuteJobWithHeavyHeartbeatsAsync( - client, + _queueClient, job, async cancelSource => { - await Task.Delay(TimeSpan.FromSeconds(5)); + await Task.Delay(TimeSpan.FromSeconds(5), cancelSource.Token); job.Result = "Something"; await Task.Delay(TimeSpan.FromSeconds(5)); - await client.CompleteJobAsync(job, false, cancel.Token); + await _queueClient.CompleteJobAsync(job, false, cancelSource.Token); return "Test"; }, TimeSpan.FromSeconds(1), @@ -384,7 +378,7 @@ await this.RetryAsync( while (currentJob.Status == JobStatus.Running) { await Task.Delay(TimeSpan.FromSeconds(1)); - currentJob = await client.GetJobByIdAsync(queueType, job.Id, true, cancel.Token); + currentJob = await _queueClient.GetJobByIdAsync(queueType, job.Id, true, cancel.Token); if (currentJob.Status == JobStatus.Running && currentJob.Result != null) {