Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
brendankowitz committed Mar 6, 2023
1 parent f641fe8 commit acd1f3c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,35 +385,32 @@ public async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancellationOnFa
(JobGroupWrapper JobGroup, IReadOnlyList<JobDefinitionWrapper> MatchingJob) definitionTuple = jobs.Single();
JobDefinitionWrapper item = definitionTuple.MatchingJob.Single();

if (item != null)
if (jobInfo.Status == JobStatus.Failed)
{
if (jobInfo.Status == JobStatus.Failed)
// If a job fails with requestCancellationOnFailure, all other jobs in the group should be cancelled.
if (requestCancellationOnFailure)
{
// If a job fails with requestCancellationOnFailure, all other jobs in the group should be cancelled.
if (requestCancellationOnFailure)
foreach (JobDefinitionWrapper jobDefinition in definitionTuple.JobGroup.Definitions)
{
foreach (JobDefinitionWrapper jobDefinition in definitionTuple.JobGroup.Definitions)
{
CancelJobDefinition(jobDefinition);
}
CancelJobDefinition(jobDefinition);
}

item.Status = (byte)JobStatus.Failed;
}
else if (item.CancelRequested)
{
item.Status = (byte)JobStatus.Cancelled;
}
else
{
item.Status = (byte?)jobInfo.Status ?? (byte?)JobStatus.Completed;
}

item.EndDate = Clock.UtcNow;
item.Result = jobInfo.Result;

await SaveJobGroupAsync(definitionTuple.JobGroup, cancellationToken);
item.Status = (byte)JobStatus.Failed;
}
else if (item.CancelRequested)
{
item.Status = (byte)JobStatus.Cancelled;
}
else
{
item.Status = (byte?)JobStatus.Completed;
}

item.EndDate = Clock.UtcNow;
item.Result = jobInfo.Result;

await SaveJobGroupAsync(definitionTuple.JobGroup, cancellationToken);
}

private async Task<IReadOnlyList<JobGroupWrapper>> GetGroupInternalAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Extensions.Xunit;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.SqlServer.Features.Schema;
using Microsoft.Health.Fhir.SqlServer.Features.Storage;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.JobManagement;
Expand Down Expand Up @@ -305,20 +301,19 @@ await this.RetryAsync(
async () =>
{
var queueType = (byte)TestQueueType.ExecuteWithHeartbeat;
var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, XUnitLogger<SqlQueueClient>.Create(_testOutputHelper));
await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None);
JobInfo job = await client.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None);
await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None);
JobInfo job = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None);
var cancel = new CancellationTokenSource();
cancel.CancelAfter(TimeSpan.FromSeconds(30));
var execTask = JobHosting.ExecuteJobWithHeartbeatsAsync(
client,
Task<string> execTask = JobHosting.ExecuteJobWithHeartbeatsAsync(
_queueClient,
queueType,
job.Id,
job.Version,
async cancelSource =>
{
await Task.Delay(TimeSpan.FromSeconds(10));
await client.CompleteJobAsync(job, false, cancel.Token);
await Task.Delay(TimeSpan.FromSeconds(10), cancelSource.Token);
await _queueClient.CompleteJobAsync(job, false, cancelSource.Token);
return "Test";
},
TimeSpan.FromSeconds(1),
Expand All @@ -332,8 +327,8 @@ await this.RetryAsync(
{
while (currentJob.Status == JobStatus.Running)
{
await Task.Delay(TimeSpan.FromSeconds(1));
currentJob = await client.GetJobByIdAsync(queueType, job.Id, true, cancel.Token);
await Task.Delay(TimeSpan.FromSeconds(1), cancel.Token);
currentJob = await _queueClient.GetJobByIdAsync(queueType, job.Id, true, cancel.Token);
if (currentJob.HeartbeatDateTime != previousJob.HeartbeatDateTime)
{
heartbeatChanges++;
Expand All @@ -355,20 +350,19 @@ await this.RetryAsync(
async () =>
{
var queueType = (byte)TestQueueType.ExecuteWithHeartbeatsHeavy;
var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, XUnitLogger<SqlQueueClient>.Create(_testOutputHelper));
await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None);
JobInfo job = await client.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None);
await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None);
JobInfo job = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None);
var cancel = new CancellationTokenSource();
cancel.CancelAfter(TimeSpan.FromSeconds(30));
var execTask = JobHosting.ExecuteJobWithHeavyHeartbeatsAsync(
client,
_queueClient,
job,
async cancelSource =>
{
await Task.Delay(TimeSpan.FromSeconds(5));
await Task.Delay(TimeSpan.FromSeconds(5), cancelSource.Token);
job.Result = "Something";
await Task.Delay(TimeSpan.FromSeconds(5));
await client.CompleteJobAsync(job, false, cancel.Token);
await _queueClient.CompleteJobAsync(job, false, cancelSource.Token);
return "Test";
},
TimeSpan.FromSeconds(1),
Expand All @@ -384,7 +378,7 @@ await this.RetryAsync(
while (currentJob.Status == JobStatus.Running)
{
await Task.Delay(TimeSpan.FromSeconds(1));
currentJob = await client.GetJobByIdAsync(queueType, job.Id, true, cancel.Token);
currentJob = await _queueClient.GetJobByIdAsync(queueType, job.Id, true, cancel.Token);

if (currentJob.Status == JobStatus.Running && currentJob.Result != null)
{
Expand Down

0 comments on commit acd1f3c

Please # to comment.