diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs new file mode 100644 index 0000000000..30a14ba384 --- /dev/null +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs @@ -0,0 +1,527 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Security.Cryptography; +using System.Threading; +using System.Threading.Tasks; +using EnsureThat; +using Microsoft.Azure.Cosmos; +using Microsoft.Health.Core; +using Microsoft.Health.Core.Extensions; +using Microsoft.Health.Extensions.DependencyInjection; +using Microsoft.Health.Fhir.CosmosDb.Features.Queries; +using Microsoft.Health.JobManagement; +using Polly; + +namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; + +public class CosmosQueueClient : IQueueClient +{ + private readonly Func> _containerFactory; + private readonly ICosmosQueryFactory _queryFactory; + private readonly ICosmosDbDistributedLockFactory _distributedLockFactory; + private static readonly AsyncPolicy _retryPolicy = Policy + .Handle() + .WaitAndRetryAsync(3, _ => TimeSpan.FromMilliseconds(GenerateRandomNumber())); + + public CosmosQueueClient( + Func> containerFactory, + ICosmosQueryFactory queryFactory, + ICosmosDbDistributedLockFactory distributedLockFactory) + { + _containerFactory = EnsureArg.IsNotNull(containerFactory, nameof(containerFactory)); + _queryFactory = EnsureArg.IsNotNull(queryFactory, nameof(queryFactory)); + _distributedLockFactory = EnsureArg.IsNotNull(distributedLockFactory, nameof(distributedLockFactory)); + } + + public bool IsInitialized() => true; + + public async Task> EnqueueAsync( + byte queueType, + string[] definitions, + long? groupId, + bool forceOneActiveJobGroup, + bool isCompleted, + CancellationToken cancellationToken) + { + EnsureArg.IsNotNull(definitions, nameof(definitions)); + + var id = GetLongId(); + var jobInfos = new List(); + + var definitionHashes = definitions.Select(d => $"'{d.ComputeHash()}'").ToList(); + + using IScoped container = _containerFactory.Invoke(); + + QueryDefinition existingJobsSpec = new QueryDefinition(@$"SELECT VALUE c FROM root c + JOIN d in c.definitions + WHERE c.queueType = @queueType + AND (c.groupId = @groupId OR @groupId = null) + AND ARRAY_CONTAINS([0, 1], d.status) + AND ARRAY_CONTAINS([{string.Join(",", definitionHashes)}], d.definitionHash)") + .WithParameter("@queueType", queueType) + .WithParameter("@groupId", groupId); + + FeedResponse existingJobs = await ExecuteQueryAsync(existingJobsSpec, 100, cancellationToken); + + if (existingJobs.Count > 0) + { + IEnumerable existingJobInfos = + existingJobs.Resource + .SelectMany(x => x.ToJobInfo(x.Definitions.Where(y => definitions.Contains(y.Definition)))); + + jobInfos.AddRange(existingJobInfos); + } + + var newDefinitions = definitions.Except(jobInfos.Select(x => x.Definition)).ToArray(); + + if (forceOneActiveJobGroup) + { + await using ICosmosDbDistributedLock distributedLock = _distributedLockFactory.Create(container.Value, "__jobQueue:" + queueType); + + if (await distributedLock.TryAcquireLock()) + { + QueryDefinition sqlQuerySpec = new QueryDefinition(@"SELECT VALUE count(1) FROM root c JOIN d in c.definitions + WHERE c.queueType = @queueType + AND ARRAY_CONTAINS([0, 1], d.status)").WithParameter("@queueType", queueType); + + var query = _queryFactory.Create( + container.Value, + new CosmosQueryContext( + sqlQuerySpec, + new QueryRequestOptions { PartitionKey = new PartitionKey(JobGroupWrapper.JobInfoPartitionKey) })); + + FeedResponse itemResponse = await query.ExecuteNextAsync(cancellationToken); + + if (itemResponse.Resource.FirstOrDefault() > 0) + { + throw new JobConflictException("Failed to enqueue job."); + } + + jobInfos.AddRange(await CreateNewJob(id, queueType, newDefinitions, groupId, isCompleted, cancellationToken)); + } + } + else + { + jobInfos.AddRange(await CreateNewJob(id, queueType, newDefinitions, groupId, isCompleted, cancellationToken)); + } + + return jobInfos; + } + + private async Task> CreateNewJob(long id, byte queueType, string[] definitions, long? groupId, bool isCompleted, CancellationToken cancellationToken) + { + var jobInfo = new JobGroupWrapper + { + Id = id.ToString(), + QueueType = queueType, + GroupId = groupId ?? id, + CreateDate = Clock.UtcNow, + TimeToLive = (int)TimeSpan.FromDays(30).TotalSeconds, + }; + + var jobId = id; + + foreach (var item in definitions) + { + var definitionInfo = new JobDefinitionWrapper + { + JobId = jobId++, + Status = isCompleted ? (byte)JobStatus.Completed : (byte)JobStatus.Created, + Definition = item, + DefinitionHash = item.ComputeHash(), + }; + + jobInfo.Definitions.Add(definitionInfo); + } + + using IScoped container = _containerFactory.Invoke(); + ItemResponse result = await container.Value.CreateItemAsync(jobInfo, new PartitionKey(JobGroupWrapper.JobInfoPartitionKey), cancellationToken: cancellationToken); + + return result.Resource.ToJobInfo().ToList(); + } + + public async Task> DequeueJobsAsync( + byte queueType, + int numberOfJobsToDequeue, + string worker, + int heartbeatTimeoutSec, + CancellationToken cancellationToken) + { + QueryDefinition sqlQuerySpec = new QueryDefinition(@"SELECT VALUE c FROM root c + JOIN d in c.definitions + WHERE c.queueType = @queueType + AND (d.status = 0 OR + (d.status = 1 AND d.heartbeatDateTime < @heartbeatDateTimeout)) + ORDER BY c.priority ASC") + .WithParameter("@queueType", queueType) + .WithParameter("@heartbeatDateTimeout", Clock.UtcNow.AddSeconds(-heartbeatTimeoutSec)); + + return await _retryPolicy.ExecuteAsync(async () => + { + FeedResponse response = await ExecuteQueryAsync(sqlQuerySpec, 1, cancellationToken); + + JobGroupWrapper item = response.FirstOrDefault(); + if (item != null) + { + return await DequeueItemsInternalAsync(item, numberOfJobsToDequeue, worker, heartbeatTimeoutSec, cancellationToken); + } + + return null; + }); + } + + private async Task> DequeueItemsInternalAsync(JobGroupWrapper item, int numberOfJobsToDequeue, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null) + { + var scan = item.Definitions + .Where(x => x.JobId == jobId + || (jobId == null && + (x.Status == (byte)JobStatus.Created + || (x.Status == (byte)JobStatus.Running && x.HeartbeatDateTime < Clock.UtcNow.AddSeconds(-heartbeatTimeoutSec))))) + .OrderBy(x => x.Status) + .ToList(); + + if (scan.Count == 0) + { + return null; + } + + var toReturn = new List(); + foreach (JobDefinitionWrapper job in scan.Take(numberOfJobsToDequeue)) + { + if (!string.IsNullOrEmpty(job.Worker)) + { + job.Info = $"Prev worker: {job.Worker}. "; + } + + if (job.DequeueCount > 10) + { + if (job.Status == (byte)JobStatus.Running) + { + job.CancelRequested = true; + } + + job.Status = (byte)JobStatus.Failed; + job.Info += "Dequeue count exceeded."; + } + else + { + job.Status = (byte)JobStatus.Running; + job.HeartbeatDateTime = Clock.UtcNow; + job.Worker = worker; + job.DequeueCount += 1; + job.StartDate ??= Clock.UtcNow; + job.Version = GenerateVersion(); + + toReturn.Add(job); + } + } + + await SaveJobGroupAsync(item, cancellationToken); + + return item.ToJobInfo(toReturn).ToList(); + } + + public async Task DequeueAsync(byte queueType, string worker, int heartbeatTimeoutSec, CancellationToken cancellationToken, long? jobId = null) + { + if (jobId == null) + { + var job = await DequeueJobsAsync(queueType, 1, worker, heartbeatTimeoutSec, cancellationToken); + return job?.FirstOrDefault(); + } + + var jobs = await GetJobsByIdsInternalAsync(queueType, new[] { jobId.Value }, false, cancellationToken); + if (jobs.Count == 1) + { + var job = await DequeueItemsInternalAsync(jobs[0].JobGroup, 1, worker, heartbeatTimeoutSec, cancellationToken); + } + + throw new JobNotExistException("Job not found."); + } + + public async Task GetJobByIdAsync(byte queueType, long jobId, bool returnDefinition, CancellationToken cancellationToken) + { + IReadOnlyList job = await GetJobsByIdsAsync(queueType, new[] { jobId }, returnDefinition, cancellationToken); + + if (job.Count == 1) + { + return job[0]; + } + + if (job.Count > 1) + { + throw new InvalidOperationException("More than one job found."); + } + + return null; + } + + public async Task> GetJobsByIdsAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken) + { + var jobs = await GetJobsByIdsInternalAsync(queueType, jobIds, returnDefinition, cancellationToken); + + var infos = new List(); + foreach ((JobGroupWrapper JobGroup, IReadOnlyList MatchingJob) item in jobs) + { + infos.AddRange(item.JobGroup.ToJobInfo(item.MatchingJob)); + } + + return infos; + } + + public async Task> GetJobByGroupIdAsync(byte queueType, long groupId, bool returnDefinition, CancellationToken cancellationToken) + { + JobGroupWrapper job = await GetGroupInternalAsync(queueType, groupId, cancellationToken); + + if (job != null) + { + return job.ToJobInfo(job.Definitions).ToList(); + } + + return null; + } + + public async Task PutJobHeartbeatAsync(JobInfo jobInfo, CancellationToken cancellationToken) + { + return await _retryPolicy.ExecuteAsync(async () => + { + var jobs = await GetJobsByIdsInternalAsync(jobInfo.QueueType, new[] { jobInfo.Id }, false, cancellationToken); + + var job = jobs.Single(); + JobDefinitionWrapper item = job.MatchingJob.Single(); + + if (item.Version != jobInfo.Version) + { + throw new JobConflictException("Job version mismatch."); + } + + if (item.Status == (byte)JobStatus.Running) + { + item.HeartbeatDateTime = Clock.UtcNow; + + if (jobInfo.Data.HasValue) + { + item.Data = jobInfo.Data; + } + + if (!string.IsNullOrEmpty(jobInfo.Result)) + { + item.Result = jobInfo.Result; + } + + await SaveJobGroupAsync(job.JobGroup, cancellationToken); + } + + return item.CancelRequested; + }); + } + + public async Task CancelJobByGroupIdAsync(byte queueType, long groupId, CancellationToken cancellationToken) + { + JobGroupWrapper job = await GetGroupInternalAsync(queueType, groupId, cancellationToken); + if (job != null) + { + foreach (JobDefinitionWrapper item in job.Definitions) + { + if (item.Status == (byte)JobStatus.Running) + { + item.CancelRequested = true; + } + else if (item.Status == (byte)JobStatus.Created) + { + item.Status = (byte)JobStatus.Cancelled; + } + } + + await SaveJobGroupAsync(job, cancellationToken); + } + } + + public async Task CancelJobByIdAsync(byte queueType, long jobId, CancellationToken cancellationToken) + { + var jobs = await GetJobsByIdsInternalAsync(queueType, new[] { jobId }, false, cancellationToken); + + if (jobs.Count == 1) + { + (JobGroupWrapper JobGroup, IReadOnlyList MatchingJob) job = jobs[0]; + JobDefinitionWrapper item = job.MatchingJob[0]; + + if (item != null) + { + CancelJobDefinition(item); + + await SaveJobGroupAsync(job.JobGroup, cancellationToken); + } + } + } + + public async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancellationOnFailure, CancellationToken cancellationToken) + { + var jobs = await GetJobsByIdsInternalAsync(jobInfo.QueueType, new[] { jobInfo.Id }, false, cancellationToken); + + (JobGroupWrapper JobGroup, IReadOnlyList MatchingJob) definitionTuple = jobs.Single(); + JobDefinitionWrapper item = definitionTuple.MatchingJob.Single(); + + if (item != null) + { + if (jobInfo.Status == JobStatus.Failed) + { + if (requestCancellationOnFailure) + { + foreach (JobDefinitionWrapper jobDefinition in definitionTuple.JobGroup.Definitions) + { + CancelJobDefinition(jobDefinition); + } + } + + item.Status = (byte)JobStatus.Failed; + } + else if (item.CancelRequested) + { + item.Status = (byte)JobStatus.Cancelled; + } + else + { + item.Status = (byte?)jobInfo.Status ?? (byte?)JobStatus.Completed; + } + + item.EndDate = Clock.UtcNow; + item.Result = jobInfo.Result; + + await SaveJobGroupAsync(definitionTuple.JobGroup, cancellationToken); + } + } + + private async Task GetGroupInternalAsync( + byte queueType, + long groupId, + CancellationToken cancellationToken) + { + QueryDefinition sqlQuerySpec = new QueryDefinition(@"SELECT VALUE c FROM root c + WHERE c.groupId = @groupId and c.queueType = @queueType") + .WithParameter("@groupId", groupId) + .WithParameter("@queueType", queueType); + + FeedResponse response = await ExecuteQueryAsync(sqlQuerySpec, 1, cancellationToken); + + return response.FirstOrDefault(); + } + + private async Task MatchingJob)>> GetJobsByIdsInternalAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken) + { + QueryDefinition sqlQuerySpec = new QueryDefinition(@$"SELECT VALUE c FROM root c JOIN d in c.definitions + WHERE c.queueType = @queueType + AND ARRAY_CONTAINS([{string.Join(",", jobIds)}], d.jobId)") + .WithParameter("@queueType", queueType); + + FeedResponse response = await ExecuteQueryAsync(sqlQuerySpec, jobIds.Length, cancellationToken); + + var infos = new List<(JobGroupWrapper JobGroup, IReadOnlyList MatchingJob)>(); + + foreach (JobGroupWrapper item in response) + { + var scan = item.Definitions + .Where(x => jobIds.Contains(x.JobId)) + .ToList(); + + infos.Add((item, scan)); + } + + return infos; + } + + private static void CancelJobDefinition(JobDefinitionWrapper item) + { + switch (item.Status) + { + case (byte)JobStatus.Running: + item.CancelRequested = true; + break; + case (byte)JobStatus.Created: + item.Status = (byte)JobStatus.Cancelled; + item.EndDate = Clock.UtcNow; + break; + } + } + + private async Task> ExecuteQueryAsync(QueryDefinition sqlQuerySpec, int itemCount, CancellationToken cancellationToken) + { + using IScoped container = _containerFactory.Invoke(); + + ICosmosQuery query = _queryFactory.Create( + container.Value, + new CosmosQueryContext( + sqlQuerySpec, + new QueryRequestOptions { PartitionKey = new PartitionKey(JobGroupWrapper.JobInfoPartitionKey), MaxItemCount = itemCount })); + + FeedResponse response = await query.ExecuteNextAsync(cancellationToken); + return response; + } + + private async Task SaveJobGroupAsync(JobGroupWrapper definition, CancellationToken cancellationToken) + { + using IScoped container = _containerFactory.Invoke(); + + try + { + await container.Value.UpsertItemAsync( + definition, + new PartitionKey(JobGroupWrapper.JobInfoPartitionKey), + new ItemRequestOptions { IfMatchEtag = definition.ETag }, + cancellationToken: cancellationToken); + } + catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.PreconditionFailed) + { + throw new RetriableJobException("Job precondition failed.", ex); + } + catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests) + { + throw new RetriableJobException("Service too busy.", ex); + } + catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.RequestEntityTooLarge) + { + throw new JobExecutionException("Job data too large.", ex); + } + } + + private static long GetLongId() + { + return IdHelper.DateToId(Clock.UtcNow.DateTime) + GenerateRandomNumber(); + } + + /// + /// To generate a random number between 100 and 999, + /// the method uses the formula (randomValue / UInt16.MaxValue) * 899 + 100. + /// This takes the percentage of the maximum UInt16 value that the generated value represents, + /// multiplies it by 899 (the difference between 999 and 100), + /// and adds 100 to shift the range to between 100 and 999. + /// + private static int GenerateRandomNumber() + { + using var rng = RandomNumberGenerator.Create(); + var bytes = new byte[2]; + rng.GetBytes(bytes); + + var value = BitConverter.ToUInt16(bytes, 0); + var percentage = (double)value / ushort.MaxValue; + + var randomNumber = (int)Math.Round(percentage * 899) + 100; + return randomNumber; + } + + /// + /// Returns a version number based on the current time. + /// Similar to SQL "datediff_big(millisecond,'0001-01-01',getUTCdate())" + /// + private static long GenerateVersion() + { + TimeSpan diff = DateTime.UtcNow - new DateTime(1, 1, 1, 0, 0, 0, DateTimeKind.Utc); + return (long)diff.TotalMilliseconds; + } +} diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/IdHelper.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/IdHelper.cs new file mode 100644 index 0000000000..fa5c8dc69c --- /dev/null +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/IdHelper.cs @@ -0,0 +1,33 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Diagnostics; +using EnsureThat; +using Microsoft.Health.Core.Extensions; + +namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; + +internal static class IdHelper +{ + private const int ShiftFactor = 3; + + internal static readonly DateTime MaxDateTime = new DateTime(long.MaxValue >> ShiftFactor, DateTimeKind.Utc).TruncateToMillisecond().AddTicks(-1); + + public static long DateToId(DateTime dateTime) + { + EnsureArg.IsLte(dateTime, MaxDateTime, nameof(dateTime)); + long id = dateTime.TruncateToMillisecond().Ticks << ShiftFactor; + + Debug.Assert(id >= 0, "The ID should not have become negative"); + return id; + } + + public static DateTime IdToDate(long resourceSurrogateId) + { + var dateTime = new DateTime(resourceSurrogateId >> ShiftFactor, DateTimeKind.Utc); + return dateTime.TruncateToMillisecond(); + } +} diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobDefinitionWrapper.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobDefinitionWrapper.cs new file mode 100644 index 0000000000..4b30fb9553 --- /dev/null +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobDefinitionWrapper.cs @@ -0,0 +1,54 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using Newtonsoft.Json; + +namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; + +public class JobDefinitionWrapper +{ + [JsonProperty("jobId")] + public long JobId { get; set; } + + [JsonProperty("status")] + public byte? Status { get; set; } + + [JsonProperty("worker")] + public string Worker { get; set; } + + [JsonProperty("definition")] + public string Definition { get; set; } + + [JsonProperty("definitionHash")] + public string DefinitionHash { get; set; } + + [JsonProperty("result")] + public string Result { get; set; } + + [JsonProperty("data")] + public long? Data { get; set; } + + [JsonProperty("startDate")] + public DateTimeOffset? StartDate { get; set; } + + [JsonProperty("endDate")] + public DateTimeOffset? EndDate { get; set; } + + [JsonProperty("heartbeatDateTime")] + public DateTimeOffset HeartbeatDateTime { get; set; } + + [JsonProperty("cancelRequested")] + public bool CancelRequested { get; set; } + + [JsonProperty("info")] + public string Info { get; set; } + + [JsonProperty("dequeueCount")] + public int DequeueCount { get; set; } + + [JsonProperty("version")] + public long Version { get; set; } +} diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobGroupWrapper.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobGroupWrapper.cs new file mode 100644 index 0000000000..d77d31c8bc --- /dev/null +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobGroupWrapper.cs @@ -0,0 +1,36 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using Newtonsoft.Json; + +namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; + +public class JobGroupWrapper : SystemData +{ + public const string JobInfoPartitionKey = "__jobs__"; + + [JsonProperty("groupId")] + public long GroupId { get; set; } + + [JsonProperty(KnownDocumentProperties.PartitionKey)] + public string PartitionKey { get; } = JobInfoPartitionKey; + + [JsonProperty("queueType")] + public byte QueueType { get; set; } + + [JsonProperty("priority")] + public long Priority { get; set; } + + [JsonProperty("definitions")] + public IList Definitions { get; } = new List(); + + [JsonProperty("createDate")] + public DateTimeOffset CreateDate { get; set; } + + [JsonProperty("ttl")] + public long TimeToLive { get; set; } +} diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobInfoExtensions.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobInfoExtensions.cs new file mode 100644 index 0000000000..25360080b9 --- /dev/null +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/JobInfoExtensions.cs @@ -0,0 +1,39 @@ +// ------------------------------------------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. +// ------------------------------------------------------------------------------------------------- + +using System.Collections.Generic; +using System.Linq; +using Microsoft.Health.JobManagement; + +namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; + +public static class JobInfoExtensions +{ + public static IEnumerable ToJobInfo(this JobGroupWrapper jobGroupWrapper) + { + return jobGroupWrapper.ToJobInfo(jobGroupWrapper.Definitions); + } + + public static IEnumerable ToJobInfo(this JobGroupWrapper jobGroupWrapper, IEnumerable items) + { + return items.Select(jobInfoWrapperItem => new JobInfo + { + Id = jobInfoWrapperItem.JobId, + QueueType = jobGroupWrapper.QueueType, + Status = jobInfoWrapperItem.Status.HasValue ? (JobStatus?)jobInfoWrapperItem.Status.Value : null, + GroupId = jobGroupWrapper.GroupId, + Definition = jobInfoWrapperItem.Definition, + Result = jobInfoWrapperItem.Result, + Data = jobInfoWrapperItem.Data, + CancelRequested = jobInfoWrapperItem.CancelRequested, + Priority = jobGroupWrapper.Priority, + CreateDate = jobGroupWrapper.CreateDate.UtcDateTime, + StartDate = jobInfoWrapperItem.StartDate?.UtcDateTime, + EndDate = jobInfoWrapperItem.EndDate?.UtcDateTime, + HeartbeatDateTime = jobInfoWrapperItem.HeartbeatDateTime.UtcDateTime, + Version = jobInfoWrapperItem.Version, + }); + } +} diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Registration/FhirServerBuilderCosmosDbRegistrationExtensions.cs b/src/Microsoft.Health.Fhir.CosmosDb/Registration/FhirServerBuilderCosmosDbRegistrationExtensions.cs index eae44a4d54..141b7dd861 100644 --- a/src/Microsoft.Health.Fhir.CosmosDb/Registration/FhirServerBuilderCosmosDbRegistrationExtensions.cs +++ b/src/Microsoft.Health.Fhir.CosmosDb/Registration/FhirServerBuilderCosmosDbRegistrationExtensions.cs @@ -17,7 +17,6 @@ using Microsoft.Health.Fhir.Core.Features.Search.Registry; using Microsoft.Health.Fhir.Core.Models; using Microsoft.Health.Fhir.Core.Registration; -using Microsoft.Health.Fhir.CosmosDb; using Microsoft.Health.Fhir.CosmosDb.Configs; using Microsoft.Health.Fhir.CosmosDb.Features.Health; using Microsoft.Health.Fhir.CosmosDb.Features.Operations; @@ -27,9 +26,12 @@ using Microsoft.Health.Fhir.CosmosDb.Features.Search.Queries; using Microsoft.Health.Fhir.CosmosDb.Features.Storage; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations; +using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Registry; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Versioning; +using Microsoft.Health.JobManagement; +using Constants = Microsoft.Health.Fhir.CosmosDb.Constants; // ReSharper disable once CheckNamespace namespace Microsoft.Extensions.DependencyInjection @@ -231,6 +233,12 @@ private static IFhirServerBuilder AddCosmosDbPersistence(this IFhirServerBuilder .Singleton() .AsSelf(); + services.Add() + .Scoped() + .AsSelf() + .AsImplementedInterfaces() + .AsFactory>(); + return fhirServerBuilder; } diff --git a/src/Microsoft.Health.Fhir.Shared.Web/Startup.cs b/src/Microsoft.Health.Fhir.Shared.Web/Startup.cs index 3fed391e3c..361034cb5b 100644 --- a/src/Microsoft.Health.Fhir.Shared.Web/Startup.cs +++ b/src/Microsoft.Health.Fhir.Shared.Web/Startup.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; +using System.Linq; using Microsoft.ApplicationInsights.Extensibility; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.AspNetCore.Builder; @@ -18,6 +19,7 @@ using Microsoft.Health.Fhir.Azure; using Microsoft.Health.Fhir.Core.Configs; using Microsoft.Health.Fhir.Core.Features; +using Microsoft.Health.Fhir.Core.Features.Operations.Export; using Microsoft.Health.Fhir.Core.Features.Operations.Import; using Microsoft.Health.JobManagement; using Microsoft.Health.SqlServer.Configs; @@ -116,8 +118,9 @@ private void AddTaskHostingService(IServiceCollection services) .AsImplementedInterfaces(); services.Configure(options => Configuration.GetSection("TaskHosting").Bind(options)); - IEnumerable jobs = services.TypesInSameAssemblyAs() + IEnumerable jobs = services.TypesInSameAssemblyAs() .AssignableTo() + .Where(t => t.Type.Name.Contains("Import", StringComparison.Ordinal) == false) .Transient() .AsSelf(); diff --git a/src/Microsoft.Health.Fhir.SqlServer/Registration/FhirServerBuilderSqlServerRegistrationExtensions.cs b/src/Microsoft.Health.Fhir.SqlServer/Registration/FhirServerBuilderSqlServerRegistrationExtensions.cs index 9bfcb4304e..1ecfcbce37 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Registration/FhirServerBuilderSqlServerRegistrationExtensions.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Registration/FhirServerBuilderSqlServerRegistrationExtensions.cs @@ -4,6 +4,7 @@ // ------------------------------------------------------------------------------------------------- using System; +using System.Collections.Generic; using System.Linq; using EnsureThat; using MediatR; @@ -11,6 +12,8 @@ using Microsoft.Extensions.Configuration; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Extensions; +using Microsoft.Health.Fhir.Core.Features.Operations.Export; +using Microsoft.Health.Fhir.Core.Features.Operations.Import; using Microsoft.Health.Fhir.Core.Features.Search.Expressions; using Microsoft.Health.Fhir.Core.Features.Search.Registry; using Microsoft.Health.Fhir.Core.Messages.Storage; @@ -25,6 +28,7 @@ using Microsoft.Health.Fhir.SqlServer.Features.Storage; using Microsoft.Health.Fhir.SqlServer.Features.Storage.Registry; using Microsoft.Health.Fhir.SqlServer.Features.Watchdogs; +using Microsoft.Health.JobManagement; using Microsoft.Health.SqlServer.Api.Registration; using Microsoft.Health.SqlServer.Configs; using Microsoft.Health.SqlServer.Features.Client; @@ -249,6 +253,17 @@ public static IFhirServerBuilder AddSqlServer(this IFhirServerBuilder fhirServer services.AddHostedService(); + IEnumerable jobs = services.TypesInSameAssemblyAs() + .AssignableTo() + .Where(t => t.Type.Name.Contains("Import", StringComparison.Ordinal)) + .Transient() + .AsSelf(); + + foreach (TypeRegistrationBuilder job in jobs) + { + job.AsDelegate>(); + } + return fhirServerBuilder; } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Microsoft.Health.Fhir.Shared.Tests.Integration.projitems b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Microsoft.Health.Fhir.Shared.Tests.Integration.projitems index 6424024e4f..63fded7d32 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Microsoft.Health.Fhir.Shared.Tests.Integration.projitems +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Microsoft.Health.Fhir.Shared.Tests.Integration.projitems @@ -44,7 +44,7 @@ - + \ No newline at end of file diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs index c13cce063e..f3780301eb 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs @@ -35,9 +35,11 @@ using Microsoft.Health.Fhir.CosmosDb.Features.Search.Queries; using Microsoft.Health.Fhir.CosmosDb.Features.Storage; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations; +using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Registry; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures; using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Versioning; +using Microsoft.Health.JobManagement; using NSubstitute; using Xunit; @@ -63,6 +65,7 @@ public class CosmosDbFhirStorageTestsFixture : IServiceProvider, IAsyncLifetime private SupportedSearchParameterDefinitionManager _supportedSearchParameterDefinitionManager; private SearchParameterStatusManager _searchParameterStatusManager; private CosmosClient _cosmosClient; + private CosmosQueueClient _queueClient; public CosmosDbFhirStorageTestsFixture() { @@ -82,6 +85,8 @@ public CosmosDbFhirStorageTestsFixture() }; } + public Container Container => _container; + public async Task InitializeAsync() { var fhirStoredProcs = typeof(IStoredProcedure).Assembly @@ -215,6 +220,11 @@ public async Task InitializeAsync() NullLogger.Instance); _fhirStorageTestHelper = new CosmosDbFhirStorageTestHelper(_container); + + _queueClient = new CosmosQueueClient( + () => _container.CreateMockScope(), + new CosmosQueryFactory(Substitute.For(), Substitute.For()), + new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger.Instance)); } public async Task DisposeAsync() @@ -284,6 +294,11 @@ object IServiceProvider.GetService(Type serviceType) return _fhirRequestContextAccessor; } + if (serviceType == typeof(IQueueClient)) + { + return _queueClient; + } + return null; } } diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs index c905921a41..fd41ee5056 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/FhirStorageTestsFixture.cs @@ -45,6 +45,7 @@ using Microsoft.Health.Fhir.Tests.Common; using Microsoft.Health.Fhir.Tests.Common.FixtureParameters; using Microsoft.Health.Fhir.Tests.Common.Mocks; +using Microsoft.Health.JobManagement; using Microsoft.Health.SqlServer.Features.Schema; using NSubstitute; using Xunit; @@ -120,6 +121,8 @@ internal FhirStorageTestsFixture(IServiceProvider fixture) public GetResourceHandler GetResourceHandler { get; set; } + public IQueueClient QueueClient => _fixture.GetRequiredService(); + public void Dispose() { (_fixture as IDisposable)?.Dispose(); diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlQueueClientTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs similarity index 55% rename from test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlQueueClientTests.cs rename to test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs index 6a352f41ab..f3b5fcd7a3 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlQueueClientTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/QueueClientTests.cs @@ -8,15 +8,10 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Microsoft.Health.Fhir.Core.UnitTests.Extensions; -using Microsoft.Health.Fhir.SqlServer.Features.Schema; -using Microsoft.Health.Fhir.SqlServer.Features.Storage; using Microsoft.Health.Fhir.Tests.Common; +using Microsoft.Health.Fhir.Tests.Common.FixtureParameters; using Microsoft.Health.JobManagement; -using Microsoft.Health.SqlServer.Features.Schema; using Microsoft.Health.Test.Utilities; -using NSubstitute; using Xunit; using Xunit.Abstractions; using JobStatus = Microsoft.Health.JobManagement.JobStatus; @@ -46,19 +41,18 @@ internal enum TestQueueType : byte [Trait(Traits.OwningTeam, OwningTeam.Fhir)] [Trait(Traits.Category, Categories.DataSourceValidation)] - public class SqlQueueClientTests : IClassFixture + [FhirStorageTestsFixtureArgumentSets(DataStore.All)] + public class QueueClientTests : IClassFixture { - private readonly SqlServerFhirStorageTestsFixture _fixture; - private readonly SchemaInformation _schemaInformation; - private ILogger _logger = Substitute.For>(); private readonly ITestOutputHelper _testOutputHelper; + private readonly FhirStorageTestsFixture _fixture; + private readonly IQueueClient _queueClient; - public SqlQueueClientTests(SqlServerFhirStorageTestsFixture fixture, ITestOutputHelper testOutputHelper) + public QueueClientTests(FhirStorageTestsFixture fixture, ITestOutputHelper testOutputHelper) { _fixture = fixture; - _schemaInformation = new SchemaInformation(SchemaVersionConstants.Min, SchemaVersionConstants.Max); - _schemaInformation.Current = SchemaVersionConstants.Max; _testOutputHelper = testOutputHelper; + _queueClient = fixture.QueueClient; } [Fact] @@ -66,9 +60,8 @@ public async Task GivenNewJobs_WhenEnqueueJobs_ThenCreatedJobsShouldBeReturned() { byte queueType = (byte)TestQueueType.GivenNewJobs_WhenEnqueueJobs_ThenCreatedJobsShouldBeReturned; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); string[] definitions = new[] { "job1", "job2" }; - IEnumerable jobInfos = await sqlQueueClient.EnqueueAsync(queueType, definitions, null, false, false, CancellationToken.None); + IEnumerable jobInfos = await _queueClient.EnqueueAsync(queueType, definitions, null, false, false, CancellationToken.None); Assert.Equal(2, jobInfos.Count()); Assert.Equal(1, jobInfos.Last().Id - jobInfos.First().Id); @@ -77,10 +70,10 @@ public async Task GivenNewJobs_WhenEnqueueJobs_ThenCreatedJobsShouldBeReturned() Assert.Null(jobInfos.First().EndDate); Assert.Equal(jobInfos.Last().GroupId, jobInfos.First().GroupId); - JobInfo jobInfo = await sqlQueueClient.GetJobByIdAsync(queueType, jobInfos.First().Id, true, CancellationToken.None); + JobInfo jobInfo = await _queueClient.GetJobByIdAsync(queueType, jobInfos.First().Id, true, CancellationToken.None); Assert.Contains(jobInfo.Definition, definitions); - jobInfo = await sqlQueueClient.GetJobByIdAsync(queueType, jobInfos.Last().Id, true, CancellationToken.None); + jobInfo = await _queueClient.GetJobByIdAsync(queueType, jobInfos.Last().Id, true, CancellationToken.None); Assert.Contains(jobInfo.Definition, definitions); } @@ -89,9 +82,8 @@ public async Task GivenNewJobsWithSameQueueType_WhenEnqueueWithForceOneActiveJob { byte queueType = (byte)TestQueueType.GivenNewJobsWithSameQueueType_WhenEnqueueWithForceOneActiveJobGroup_ThenSecondJobShouldNotBeEnqueued; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - IEnumerable jobInfos = await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1" }, null, true, false, CancellationToken.None); - await Assert.ThrowsAsync(async () => await sqlQueueClient.EnqueueAsync(queueType, new[] { "job2" }, null, true, false, CancellationToken.None)); + IEnumerable jobInfos = await _queueClient.EnqueueAsync(queueType, new[] { "job1" }, null, true, false, CancellationToken.None); + await Assert.ThrowsAsync(async () => await _queueClient.EnqueueAsync(queueType, new[] { "job2" }, null, true, false, CancellationToken.None)); } [Fact] @@ -99,11 +91,10 @@ public async Task GivenJobsWithSameDefinition_WhenEnqueue_ThenOnlyOneJobShouldBe { byte queueType = (byte)TestQueueType.GivenJobsWithSameDefinition_WhenEnqueue_ThenOnlyOneJobShouldBeEnqueued; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - IEnumerable jobInfos = await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1"}, null, false, false, CancellationToken.None); + IEnumerable jobInfos = await _queueClient.EnqueueAsync(queueType, new[] { "job1"}, null, false, false, CancellationToken.None); Assert.Single(jobInfos); long jobId = jobInfos.First().Id; - jobInfos = await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1"}, null, false, false, CancellationToken.None); + jobInfos = await _queueClient.EnqueueAsync(queueType, new[] { "job1"}, null, false, false, CancellationToken.None); Assert.Equal(jobId, jobInfos.First().Id); } @@ -113,12 +104,11 @@ public async Task GivenJobsWithSameDefinition_WhenEnqueueWithGroupId_ThenGroupId byte queueType = (byte)TestQueueType.GivenJobsWithSameDefinition_WhenEnqueueWithGroupId_ThenGroupIdShouldBeCorrect; long groupId = new Random().Next(int.MinValue, int.MaxValue); - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - IEnumerable jobInfos = await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1", "job2" }, groupId, false, false, CancellationToken.None); + IEnumerable jobInfos = await _queueClient.EnqueueAsync(queueType, new[] { "job1", "job2" }, groupId, false, false, CancellationToken.None); Assert.Equal(2, jobInfos.Count()); Assert.Equal(groupId, jobInfos.First().GroupId); Assert.Equal(groupId, jobInfos.Last().GroupId); - jobInfos = await sqlQueueClient.EnqueueAsync(queueType, new[] { "job3", "job4"}, groupId, false, false, CancellationToken.None); + jobInfos = await _queueClient.EnqueueAsync(queueType, new[] { "job3", "job4"}, groupId, false, false, CancellationToken.None); Assert.Equal(2, jobInfos.Count()); Assert.Equal(groupId, jobInfos.First().GroupId); Assert.Equal(groupId, jobInfos.Last().GroupId); @@ -129,16 +119,15 @@ public async Task GivenJobsEnqueue_WhenDequeue_ThenAllJobsShouldBeReturned() { byte queueType = (byte)TestQueueType.GivenJobsEnqueue_WhenDequeue_ThenAllJobsShouldBeReturned; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1" }, null, false, false, CancellationToken.None); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job2" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job1" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job2" }, null, false, false, CancellationToken.None); List definitions = new List(); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); definitions.Add(jobInfo1.Definition); - JobInfo jobInfo2 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); definitions.Add(jobInfo2.Definition); - Assert.Null(await sqlQueueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None)); + Assert.Null(await _queueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None)); Assert.Contains("job1", definitions); Assert.Contains("job2", definitions); @@ -149,14 +138,13 @@ public async Task GivenJobWithExpiredHeartbeat_WhenDequeue_ThenJobWithResultShou { byte queueType = (byte)TestQueueType.GivenJobWithExpiredHeartbeat_WhenDequeue_ThenJobWithResultShouldBeReturned; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job1" }, null, false, false, CancellationToken.None); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); jobInfo1.QueueType = queueType; jobInfo1.Result = "current-result"; await JobHosting.ExecuteJobWithHeavyHeartbeatsAsync( - sqlQueueClient, + _queueClient, jobInfo1, async cancelSource => { @@ -166,7 +154,7 @@ await JobHosting.ExecuteJobWithHeavyHeartbeatsAsync( TimeSpan.FromSeconds(0.1), new CancellationTokenSource()); await Task.Delay(TimeSpan.FromSeconds(1)); - JobInfo jobInfo2 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); Assert.Equal(jobInfo1.Result, jobInfo2.Result); } @@ -175,16 +163,15 @@ public async Task GivenRunningJobCancelled_WhenHeartbeat_ThenCancelRequestedShou { byte queueType = (byte)TestQueueType.GivenRunningJobCancelled_WhenHeartbeat_ThenCancelRequestedShouldBeReturned; - var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); - var job = await client.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); + var job = await _queueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); job.QueueType = queueType; - await client.CancelJobByGroupIdAsync(queueType, job.GroupId, CancellationToken.None); + await _queueClient.CancelJobByGroupIdAsync(queueType, job.GroupId, CancellationToken.None); try { await JobHosting.ExecuteJobWithHeavyHeartbeatsAsync( - client, + _queueClient, job, async cancelSource => { @@ -200,7 +187,7 @@ await JobHosting.ExecuteJobWithHeavyHeartbeatsAsync( } Assert.Equal(JobStatus.Running, job.Status); - job = await client.GetJobByIdAsync(queueType, job.Id, false, CancellationToken.None); + job = await _queueClient.GetJobByIdAsync(queueType, job.Id, false, CancellationToken.None); Assert.True(job.CancelRequested); } @@ -209,13 +196,12 @@ public async Task GivenJobNotHeartbeat_WhenDequeue_ThenJobShouldBeReturnedAgain( { byte queueType = (byte)TestQueueType.GivenJobNotHeartbeat_WhenDequeue_ThenJobShouldBeReturnedAgain; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job1" }, null, false, false, CancellationToken.None); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); await Task.Delay(TimeSpan.FromSeconds(1)); - JobInfo jobInfo2 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); - Assert.Null(await sqlQueueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None)); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + Assert.Null(await _queueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None)); Assert.Equal(jobInfo1.Id, jobInfo2.Id); Assert.True(jobInfo1.Version < jobInfo2.Version); @@ -226,24 +212,23 @@ public async Task GivenGroupJobs_WhenCompleteJob_ThenJobsShouldBeCompleted() { byte queueType = (byte)TestQueueType.GivenGroupJobs_WhenCompleteJob_ThenJobsShouldBeCompleted; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1", "job2" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job1", "job2" }, null, false, false, CancellationToken.None); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); - JobInfo jobInfo2 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 10, CancellationToken.None); Assert.Equal(JobStatus.Running, jobInfo1.Status); jobInfo1.Status = JobStatus.Failed; jobInfo1.Result = "Failed for cancellation"; - await sqlQueueClient.CompleteJobAsync(jobInfo1, false, CancellationToken.None); - JobInfo jobInfo = await sqlQueueClient.GetJobByIdAsync(queueType, jobInfo1.Id, false, CancellationToken.None); + await _queueClient.CompleteJobAsync(jobInfo1, false, CancellationToken.None); + JobInfo jobInfo = await _queueClient.GetJobByIdAsync(queueType, jobInfo1.Id, false, CancellationToken.None); Assert.Equal(JobStatus.Failed, jobInfo.Status); Assert.Equal(jobInfo1.Result, jobInfo.Result); jobInfo2.Status = JobStatus.Completed; jobInfo2.Result = "Completed"; - await sqlQueueClient.CompleteJobAsync(jobInfo2, false, CancellationToken.None); - jobInfo = await sqlQueueClient.GetJobByIdAsync(queueType, jobInfo2.Id, false, CancellationToken.None); + await _queueClient.CompleteJobAsync(jobInfo2, false, CancellationToken.None); + jobInfo = await _queueClient.GetJobByIdAsync(queueType, jobInfo2.Id, false, CancellationToken.None); Assert.Equal(JobStatus.Completed, jobInfo.Status); Assert.Equal(jobInfo2.Result, jobInfo.Result); } @@ -253,26 +238,25 @@ public async Task GivenGroupJobs_WhenCancelJobsByGroupId_ThenAllJobsShouldBeCanc { byte queueType = (byte)TestQueueType.GivenGroupJobs_WhenCancelJobsByGroupId_ThenAllJobsShouldBeCancelled; - SqlQueueClient sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); - JobInfo jobInfo2 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); - await sqlQueueClient.CancelJobByGroupIdAsync(queueType, jobInfo1.GroupId, CancellationToken.None); - Assert.True((await sqlQueueClient.GetJobByGroupIdAsync(queueType, jobInfo1.GroupId, false, CancellationToken.None)).All(t => t.Status == JobStatus.Cancelled || (t.Status == JobStatus.Running && t.CancelRequested))); + await _queueClient.CancelJobByGroupIdAsync(queueType, jobInfo1.GroupId, CancellationToken.None); + Assert.True((await _queueClient.GetJobByGroupIdAsync(queueType, jobInfo1.GroupId, false, CancellationToken.None)).All(t => t.Status == JobStatus.Cancelled || (t.Status == JobStatus.Running && t.CancelRequested))); jobInfo1.Status = JobStatus.Failed; jobInfo1.Result = "Failed for cancellation"; - await sqlQueueClient.CompleteJobAsync(jobInfo1, false, CancellationToken.None); - JobInfo jobInfo = await sqlQueueClient.GetJobByIdAsync(queueType, jobInfo1.Id, false, CancellationToken.None); + await _queueClient.CompleteJobAsync(jobInfo1, false, CancellationToken.None); + JobInfo jobInfo = await _queueClient.GetJobByIdAsync(queueType, jobInfo1.Id, false, CancellationToken.None); Assert.Equal(JobStatus.Failed, jobInfo.Status); Assert.Equal(jobInfo1.Result, jobInfo.Result); jobInfo2.Status = JobStatus.Completed; jobInfo2.Result = "Completed"; - await sqlQueueClient.CompleteJobAsync(jobInfo2, false, CancellationToken.None); - jobInfo = await sqlQueueClient.GetJobByIdAsync(queueType, jobInfo2.Id, false, CancellationToken.None); + await _queueClient.CompleteJobAsync(jobInfo2, false, CancellationToken.None); + jobInfo = await _queueClient.GetJobByIdAsync(queueType, jobInfo2.Id, false, CancellationToken.None); Assert.Equal(JobStatus.Cancelled, jobInfo.Status); Assert.Equal(jobInfo2.Result, jobInfo.Result); } @@ -282,14 +266,13 @@ public async Task GivenGroupJobs_WhenCancelJobsById_ThenOnlySingleJobShouldBeCan { byte queueType = (byte)TestQueueType.GivenGroupJobs_WhenCancelJobsById_ThenOnlySingleJobShouldBeCancelled; - var sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - IEnumerable jobs = await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); + IEnumerable jobs = await _queueClient.EnqueueAsync(queueType, new[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); - await sqlQueueClient.CancelJobByIdAsync(queueType, jobs.First().Id, CancellationToken.None); - Assert.Equal(JobStatus.Cancelled, (await sqlQueueClient.GetJobByIdAsync(queueType, jobs.First().Id, false, CancellationToken.None)).Status); + await _queueClient.CancelJobByIdAsync(queueType, jobs.First().Id, CancellationToken.None); + Assert.Equal(JobStatus.Cancelled, (await _queueClient.GetJobByIdAsync(queueType, jobs.First().Id, false, CancellationToken.None)).Status); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); - JobInfo jobInfo2 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo2 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); Assert.False(jobInfo1.CancelRequested); Assert.False(jobInfo2.CancelRequested); @@ -300,30 +283,28 @@ public async Task GivenGroupJobs_WhenOneJobFailedAndRequestCancellation_ThenAllJ { byte queueType = (byte)TestQueueType.GivenGroupJobs_WhenOneJobFailedAndRequestCancellation_ThenAllJobsShouldBeCancelled; - var sqlQueueClient = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, _logger); - await sqlQueueClient.EnqueueAsync(queueType, new[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job1", "job2", "job3" }, null, false, false, CancellationToken.None); - JobInfo jobInfo1 = await sqlQueueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); + JobInfo jobInfo1 = await _queueClient.DequeueAsync(queueType, "test-worker", 0, CancellationToken.None); jobInfo1.Status = JobStatus.Failed; jobInfo1.Result = "Failed for critical error"; - await sqlQueueClient.CompleteJobAsync(jobInfo1, true, CancellationToken.None); - Assert.True((await sqlQueueClient.GetJobByGroupIdAsync(queueType, jobInfo1.GroupId, false, CancellationToken.None)).All(t => t.Status is (JobStatus?)JobStatus.Cancelled or (JobStatus?)JobStatus.Failed)); + await _queueClient.CompleteJobAsync(jobInfo1, true, CancellationToken.None); + Assert.True((await _queueClient.GetJobByGroupIdAsync(queueType, jobInfo1.GroupId, false, CancellationToken.None)).All(t => t.Status is (JobStatus?)JobStatus.Cancelled or (JobStatus?)JobStatus.Failed)); } [Fact] public async Task ExecuteWithHeartbeats() { var queueType = (byte)TestQueueType.ExecuteWithHeartbeat; - var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, XUnitLogger.Create(_testOutputHelper)); - await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); - JobInfo job = await client.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); + JobInfo job = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); var cancel = new CancellationTokenSource(); cancel.CancelAfter(TimeSpan.FromSeconds(30)); var execDate = DateTime.UtcNow; var dequeueDate = DateTime.UtcNow; var execTask = JobHosting.ExecuteJobWithHeartbeatsAsync( - client, + _queueClient, queueType, job.Id, job.Version, @@ -342,7 +323,7 @@ public async Task ExecuteWithHeartbeats() while (jobInt == null) { await Task.Delay(TimeSpan.FromSeconds(1)); - jobInt = await client.DequeueAsync(queueType, "test-worker", 2, cancel.Token); + jobInt = await _queueClient.DequeueAsync(queueType, "test-worker", 2, cancel.Token); } dequeueDate = DateTime.UtcNow; @@ -358,15 +339,14 @@ public async Task ExecuteWithHeartbeats() public async Task ExecuteWithHeartbeatsHeavy() { var queueType = (byte)TestQueueType.ExecuteWithHeartbeatsHeavy; - var client = new SqlQueueClient(_fixture.SqlConnectionWrapperFactory, _schemaInformation, XUnitLogger.Create(_testOutputHelper)); - await client.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); - JobInfo job = await client.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); + await _queueClient.EnqueueAsync(queueType, new[] { "job" }, null, false, false, CancellationToken.None); + JobInfo job = await _queueClient.DequeueAsync(queueType, "test-worker", 1, CancellationToken.None); var cancel = new CancellationTokenSource(); cancel.CancelAfter(TimeSpan.FromSeconds(30)); var execDate = DateTime.UtcNow; var dequeueDate = DateTime.UtcNow; var execTask = JobHosting.ExecuteJobWithHeavyHeartbeatsAsync( - client, + _queueClient, job, async cancelSource => { @@ -383,7 +363,7 @@ public async Task ExecuteWithHeartbeatsHeavy() while (jobInt == null) { await Task.Delay(TimeSpan.FromSeconds(1)); - jobInt = await client.DequeueAsync(queueType, "test-worker", 2, cancel.Token); + jobInt = await _queueClient.DequeueAsync(queueType, "test-worker", 2, cancel.Token); } dequeueDate = DateTime.UtcNow; diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs index 1a5353de38..fa5a50a1f2 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/SqlServerFhirStorageTestsFixture.cs @@ -35,6 +35,7 @@ using Microsoft.Health.Fhir.SqlServer.Features.Search.Expressions.Visitors; using Microsoft.Health.Fhir.SqlServer.Features.Storage; using Microsoft.Health.Fhir.SqlServer.Features.Storage.Registry; +using Microsoft.Health.JobManagement; using Microsoft.Health.JobManagement.UnitTests; using Microsoft.Health.SqlServer; using Microsoft.Health.SqlServer.Configs; @@ -68,6 +69,7 @@ public class SqlServerFhirStorageTestsFixture : IServiceProvider, IAsyncLifetime private readonly SearchParameterStatusManager _searchParameterStatusManager; private readonly IMediator _mediator = Substitute.For(); private readonly RequestContextAccessor _fhirRequestContextAccessor = Substitute.For>(); + private readonly SqlQueueClient _sqlQueueClient; public SqlServerFhirStorageTestsFixture() : this(SchemaVersionConstants.Max, $"FHIRINTEGRATIONTEST_{DateTimeOffset.UtcNow.ToUnixTimeSeconds()}_{BigInteger.Abs(new BigInteger(Guid.NewGuid().ToByteArray()))}") @@ -239,6 +241,8 @@ internal SqlServerFhirStorageTestsFixture(int maximumSupportedSchemaVersion, str NullLogger.Instance); _testHelper = new SqlServerFhirStorageTestHelper(initialConnectionString, MasterDatabaseName, sqlServerFhirModel, SqlConnectionBuilder, queueClient); + + _sqlQueueClient = new SqlQueueClient(SqlConnectionWrapperFactory, SchemaInformation, NullLogger.Instance); } public string TestConnectionString { get; } @@ -355,6 +359,11 @@ object IServiceProvider.GetService(Type serviceType) return _fhirRequestContextAccessor; } + if (serviceType == typeof(IQueueClient)) + { + return _sqlQueueClient; + } + return null; } }