Skip to content

Commit

Permalink
Adds CosmosQueueClient
Browse files Browse the repository at this point in the history
  • Loading branch information
brendankowitz committed Feb 21, 2023
1 parent 5d65a43 commit a4f1e55
Show file tree
Hide file tree
Showing 13 changed files with 812 additions and 90 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Diagnostics;
using EnsureThat;
using Microsoft.Health.Core.Extensions;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

internal static class IdHelper
{
private const int ShiftFactor = 3;

internal static readonly DateTime MaxDateTime = new DateTime(long.MaxValue >> ShiftFactor, DateTimeKind.Utc).TruncateToMillisecond().AddTicks(-1);

public static long DateToId(DateTime dateTime)
{
EnsureArg.IsLte(dateTime, MaxDateTime, nameof(dateTime));
long id = dateTime.TruncateToMillisecond().Ticks << ShiftFactor;

Debug.Assert(id >= 0, "The ID should not have become negative");
return id;
}

public static DateTime IdToDate(long resourceSurrogateId)
{
var dateTime = new DateTime(resourceSurrogateId >> ShiftFactor, DateTimeKind.Utc);
return dateTime.TruncateToMillisecond();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using Newtonsoft.Json;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

public class JobDefinitionWrapper
{
[JsonProperty("jobId")]
public long JobId { get; set; }

[JsonProperty("status")]
public byte? Status { get; set; }

[JsonProperty("worker")]
public string Worker { get; set; }

[JsonProperty("definition")]
public string Definition { get; set; }

[JsonProperty("definitionHash")]
public string DefinitionHash { get; set; }

[JsonProperty("result")]
public string Result { get; set; }

[JsonProperty("data")]
public long? Data { get; set; }

[JsonProperty("startDate")]
public DateTimeOffset? StartDate { get; set; }

[JsonProperty("endDate")]
public DateTimeOffset? EndDate { get; set; }

[JsonProperty("heartbeatDateTime")]
public DateTimeOffset HeartbeatDateTime { get; set; }

[JsonProperty("cancelRequested")]
public bool CancelRequested { get; set; }

[JsonProperty("info")]
public string Info { get; set; }

[JsonProperty("dequeueCount")]
public int DequeueCount { get; set; }

[JsonProperty("version")]
public long Version { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Newtonsoft.Json;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

public class JobGroupWrapper : SystemData
{
public const string JobInfoPartitionKey = "__jobs__";

[JsonProperty("groupId")]
public long GroupId { get; set; }

[JsonProperty(KnownDocumentProperties.PartitionKey)]
public string PartitionKey { get; } = JobInfoPartitionKey;

[JsonProperty("queueType")]
public byte QueueType { get; set; }

[JsonProperty("priority")]
public long Priority { get; set; }

[JsonProperty("definitions")]
public IList<JobDefinitionWrapper> Definitions { get; } = new List<JobDefinitionWrapper>();

[JsonProperty("createDate")]
public DateTimeOffset CreateDate { get; set; }

[JsonProperty("ttl")]
public long TimeToLive { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;
using Microsoft.Health.JobManagement;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

public static class JobInfoExtensions
{
public static IEnumerable<JobInfo> ToJobInfo(this JobGroupWrapper jobGroupWrapper)
{
return jobGroupWrapper.ToJobInfo(jobGroupWrapper.Definitions);
}

public static IEnumerable<JobInfo> ToJobInfo(this JobGroupWrapper jobGroupWrapper, IEnumerable<JobDefinitionWrapper> items)
{
return items.Select(jobInfoWrapperItem => new JobInfo
{
Id = jobInfoWrapperItem.JobId,
QueueType = jobGroupWrapper.QueueType,
Status = jobInfoWrapperItem.Status.HasValue ? (JobStatus?)jobInfoWrapperItem.Status.Value : null,
GroupId = jobGroupWrapper.GroupId,
Definition = jobInfoWrapperItem.Definition,
Result = jobInfoWrapperItem.Result,
Data = jobInfoWrapperItem.Data,
CancelRequested = jobInfoWrapperItem.CancelRequested,
Priority = jobGroupWrapper.Priority,
CreateDate = jobGroupWrapper.CreateDate.UtcDateTime,
StartDate = jobInfoWrapperItem.StartDate?.UtcDateTime,
EndDate = jobInfoWrapperItem.EndDate?.UtcDateTime,
HeartbeatDateTime = jobInfoWrapperItem.HeartbeatDateTime.UtcDateTime,
Version = jobInfoWrapperItem.Version,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using Microsoft.Health.Fhir.Core.Features.Search.Registry;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.Core.Registration;
using Microsoft.Health.Fhir.CosmosDb;
using Microsoft.Health.Fhir.CosmosDb.Configs;
using Microsoft.Health.Fhir.CosmosDb.Features.Health;
using Microsoft.Health.Fhir.CosmosDb.Features.Operations;
Expand All @@ -27,9 +26,12 @@
using Microsoft.Health.Fhir.CosmosDb.Features.Search.Queries;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Registry;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Versioning;
using Microsoft.Health.JobManagement;
using Constants = Microsoft.Health.Fhir.CosmosDb.Constants;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
Expand Down Expand Up @@ -231,6 +233,12 @@ private static IFhirServerBuilder AddCosmosDbPersistence(this IFhirServerBuilder
.Singleton()
.AsSelf();

services.Add<CosmosQueueClient>()
.Scoped()
.AsSelf()
.AsImplementedInterfaces()
.AsFactory<IScoped<IQueueClient>>();

return fhirServerBuilder;
}

Expand Down
5 changes: 4 additions & 1 deletion src/Microsoft.Health.Fhir.Shared.Web/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Builder;
Expand All @@ -18,6 +19,7 @@
using Microsoft.Health.Fhir.Azure;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features;
using Microsoft.Health.Fhir.Core.Features.Operations.Export;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.JobManagement;
using Microsoft.Health.SqlServer.Configs;
Expand Down Expand Up @@ -116,8 +118,9 @@ private void AddTaskHostingService(IServiceCollection services)
.AsImplementedInterfaces();
services.Configure<TaskHostingConfiguration>(options => Configuration.GetSection("TaskHosting").Bind(options));

IEnumerable<TypeRegistrationBuilder> jobs = services.TypesInSameAssemblyAs<ImportOrchestratorJob>()
IEnumerable<TypeRegistrationBuilder> jobs = services.TypesInSameAssemblyAs<ExportOrchestratorJob>()
.AssignableTo<IJob>()
.Where(t => t.Type.Name.Contains("Import", StringComparison.Ordinal) == false)
.Transient()
.AsSelf();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using EnsureThat;
using MediatR;
using MediatR.Pipeline;
using Microsoft.Extensions.Configuration;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Operations.Export;
using Microsoft.Health.Fhir.Core.Features.Operations.Import;
using Microsoft.Health.Fhir.Core.Features.Search.Expressions;
using Microsoft.Health.Fhir.Core.Features.Search.Registry;
using Microsoft.Health.Fhir.Core.Messages.Storage;
Expand All @@ -25,6 +28,7 @@
using Microsoft.Health.Fhir.SqlServer.Features.Storage;
using Microsoft.Health.Fhir.SqlServer.Features.Storage.Registry;
using Microsoft.Health.Fhir.SqlServer.Features.Watchdogs;
using Microsoft.Health.JobManagement;
using Microsoft.Health.SqlServer.Api.Registration;
using Microsoft.Health.SqlServer.Configs;
using Microsoft.Health.SqlServer.Features.Client;
Expand Down Expand Up @@ -249,6 +253,17 @@ public static IFhirServerBuilder AddSqlServer(this IFhirServerBuilder fhirServer

services.AddHostedService<WatchdogsBackgroundService>();

IEnumerable<TypeRegistrationBuilder> jobs = services.TypesInSameAssemblyAs<ImportOrchestratorJob>()
.AssignableTo<IJob>()
.Where(t => t.Type.Name.Contains("Import", StringComparison.Ordinal))
.Transient()
.AsSelf();

foreach (TypeRegistrationBuilder job in jobs)
{
job.AsDelegate<Func<IJob>>();
}

return fhirServerBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SqlServerFhirStorageTestHelper.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SqlServerFhirStorageTestsFixture.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SqlServerSqlCompatibilityTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SqlQueueClientTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\QueueClientTests.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Persistence\SqlServerTransactionScopeTests.cs" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
using Microsoft.Health.Fhir.CosmosDb.Features.Search.Queries;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Operations;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Registry;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.StoredProcedures;
using Microsoft.Health.Fhir.CosmosDb.Features.Storage.Versioning;
using Microsoft.Health.JobManagement;
using NSubstitute;
using Xunit;

Expand All @@ -63,6 +65,7 @@ public class CosmosDbFhirStorageTestsFixture : IServiceProvider, IAsyncLifetime
private SupportedSearchParameterDefinitionManager _supportedSearchParameterDefinitionManager;
private SearchParameterStatusManager _searchParameterStatusManager;
private CosmosClient _cosmosClient;
private CosmosQueueClient _queueClient;

public CosmosDbFhirStorageTestsFixture()
{
Expand All @@ -82,6 +85,8 @@ public CosmosDbFhirStorageTestsFixture()
};
}

public Container Container => _container;

public async Task InitializeAsync()
{
var fhirStoredProcs = typeof(IStoredProcedure).Assembly
Expand Down Expand Up @@ -215,6 +220,11 @@ public async Task InitializeAsync()
NullLogger<SearchParameterStatusManager>.Instance);

_fhirStorageTestHelper = new CosmosDbFhirStorageTestHelper(_container);

_queueClient = new CosmosQueueClient(
() => _container.CreateMockScope(),
new CosmosQueryFactory(Substitute.For<ICosmosResponseProcessor>(), Substitute.For<ICosmosQueryLogger>()),
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance));
}

public async Task DisposeAsync()
Expand Down Expand Up @@ -284,6 +294,11 @@ object IServiceProvider.GetService(Type serviceType)
return _fhirRequestContextAccessor;
}

if (serviceType == typeof(IQueueClient))
{
return _queueClient;
}

return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.Fhir.Tests.Common.FixtureParameters;
using Microsoft.Health.Fhir.Tests.Common.Mocks;
using Microsoft.Health.JobManagement;
using Microsoft.Health.SqlServer.Features.Schema;
using NSubstitute;
using Xunit;
Expand Down Expand Up @@ -120,6 +121,8 @@ internal FhirStorageTestsFixture(IServiceProvider fixture)

public GetResourceHandler GetResourceHandler { get; set; }

public IQueueClient QueueClient => _fixture.GetRequiredService<IQueueClient>();

public void Dispose()
{
(_fixture as IDisposable)?.Dispose();
Expand Down
Loading

0 comments on commit a4f1e55

Please # to comment.