Skip to content

Commit

Permalink
SqlStoreClient class, SqlRetryService signatures plus (#3427)
Browse files Browse the repository at this point in the history
* SqlService helper and events reader tool

* tests

* do not publish

* 7.0 + SqlStoreClient

* Removed constructor and moved more to helper

* Moved hard delete to correct retries

* _store

* formal cancel

* Fixed sql retry signatures

* Correcting class name

* single

* faster

* single

* delete files

* internal

* see friends

* namespace

* client for any class

* corrected type name

* yml back

* Fixed double retry
  • Loading branch information
SergeyGaluzo authored Jul 26, 2023
1 parent c2214ec commit 940406b
Show file tree
Hide file tree
Showing 23 changed files with 559 additions and 379 deletions.
15 changes: 11 additions & 4 deletions Microsoft.Health.Fhir.sln
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,13 @@ Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "Microsoft.Health.Fhir.Share
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4.Web.UnitTests", "src\Microsoft.Health.Fhir.R4.Web.UnitTests\Microsoft.Health.Fhir.R4.Web.UnitTests.csproj", "{C834E05D-79CA-4983-8599-28AC098F755A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.R4B.Web.UnitTests", "src\Microsoft.Health.Fhir.R4B.Web.UnitTests\Microsoft.Health.Fhir.R4B.Web.UnitTests.csproj", "{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4B.Web.UnitTests", "src\Microsoft.Health.Fhir.R4B.Web.UnitTests\Microsoft.Health.Fhir.R4B.Web.UnitTests.csproj", "{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.R5.Web.UnitTests", "src\Microsoft.Health.Fhir.R5.Web.UnitTests\Microsoft.Health.Fhir.R5.Web.UnitTests.csproj", "{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R5.Web.UnitTests", "src\Microsoft.Health.Fhir.R5.Web.UnitTests\Microsoft.Health.Fhir.R5.Web.UnitTests.csproj", "{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Health.Fhir.Stu3.Web.UnitTests", "src\Microsoft.Health.Fhir.Stu3.Web.UnitTests\Microsoft.Health.Fhir.Stu3.Web.UnitTests.csproj", "{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.Stu3.Web.UnitTests", "src\Microsoft.Health.Fhir.Stu3.Web.UnitTests\Microsoft.Health.Fhir.Stu3.Web.UnitTests.csproj", "{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventsReader", "tools\EventsReader\EventsReader.csproj", "{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -440,6 +442,10 @@ Global
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780}.Release|Any CPU.Build.0 = Release|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -527,10 +533,11 @@ Global
{6F000A06-6307-46FF-83FA-DD9FD2FD2AA5} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{62E8CD81-91A9-4872-BC6E-9EBBED8D50FD} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{8F4858B3-A3CF-4130-B3B2-954CBA9FE780} = {323F60C6-936A-4C5B-AF6A-F27E93AA7B05}
{9B3DEBE5-5C1F-419F-BBE3-BA67D1C074A7} = {B70945F4-01A6-4351-955B-C4A2943B5E3B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
RESX_SortFileContentOnSave = True
SolutionGuid = {E370FB31-CF95-47D1-B1E1-863A77973FF8}
EndGlobalSection
GlobalSection(SharedMSBuildProjectFiles) = preSolution
test\Microsoft.Health.Fhir.Shared.Tests.E2E.Common\Microsoft.Health.Fhir.Shared.Tests.E2E.Common.projitems*{0478b687-7105-40f6-a2dc-81057890e944}*SharedItemsImports = 13
Expand Down
1 change: 1 addition & 0 deletions src/Microsoft.Health.Fhir.SqlServer/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R4.Tests.E2E")]
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R4B.Tests.E2E")]
[assembly: InternalsVisibleTo("Microsoft.Health.Fhir.R5.Tests.E2E")]
[assembly: InternalsVisibleTo("Microsoft.Health.Internal.Fhir.EventsReader")]
[assembly: NeutralResourcesLanguage("en-us")]
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Microsoft.Health.Fhir.SqlServer.Features
{
internal static class ExceptionExtention
internal static class ExceptionExtension
{
internal static bool IsRetriable(this Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private async Task TryLogEvent(string process, string status, string text, Cance
{
if (_store != null)
{
await _store.TryLogEvent(process, status, text, null, cancellationToken);
await _store.StoreClient.TryLogEvent(process, status, text, null, cancellationToken);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
}

_logger.LogError(e, $"Error on {nameof(ImportResourcesInBufferInternal)} retries={{Retries}}", retries);
_store.TryLogEvent(nameof(ImportResourcesInBufferInternal), "Error", $"retries={retries} error={e}", null, cancellationToken).Wait();
_store.StoreClient.TryLogEvent(nameof(ImportResourcesInBufferInternal), "Error", $"retries={retries} error={e}", null, cancellationToken).Wait();

throw;
}
Expand Down Expand Up @@ -142,7 +142,7 @@ private async Task ImportResourcesInBufferInternal(List<ImportResource> resource
else
{
// dedup by last updated
var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(true)).Select(_ => _.First()).ToList();
var inputDedupped = goodResources.GroupBy(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId, true)).Select(_ => _.First()).ToList();

// 2 paths:
// 1 - if versions were specified on input then dups need to be checked within input and database
Expand All @@ -155,7 +155,7 @@ private async Task ImportResourcesInBufferInternal(List<ImportResource> resource
// assume that only one unassigned version is provided for a given resource as we cannot guarantee processing order across parallel file streams anyway
var inputDeduppedNoVersion = inputDedupped.Where(_ => !_.KeepVersion).GroupBy(_ => _.ResourceWrapper.ToResourceKey(true)).Select(_ => _.First()).ToList();
//// check whether record can fit
var currentDates = (await _store.GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(true), _ => _.ToResourceDateKey());
var currentDates = (await _store.GetAsync(inputDeduppedNoVersion.Select(_ => _.ResourceWrapper.ToResourceKey(true)).ToList(), cancellationToken)).ToDictionary(_ => _.ToResourceKey(true), _ => _.ToResourceDateKey(_model.GetResourceTypeId));
var inputDeduppedNoVersionForCheck = new List<ImportResource>();
foreach (var resource in inputDeduppedNoVersion)
{
Expand All @@ -166,7 +166,7 @@ private async Task ImportResourcesInBufferInternal(List<ImportResource> resource
}
}

var versionSlots = (await _store.GetResourceVervionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey()).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_.ResourceType, _.Id, null), _ => _);
var versionSlots = (await _store.StoreClient.GetResourceVersionsAsync(inputDeduppedNoVersionForCheck.Select(_ => _.ResourceWrapper.ToResourceDateKey(_model.GetResourceTypeId)).ToList(), cancellationToken)).ToDictionary(_ => new ResourceKey(_model.GetResourceTypeName(_.ResourceTypeId), _.Id, null), _ => _);
foreach (var resource in inputDeduppedNoVersionForCheck)
{
var resourceKey = resource.ResourceWrapper.ToResourceKey(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,32 +666,12 @@ private static (long StartId, long EndId) ReaderToSurrogateIdRange(SqlDataReader

public override async Task<IReadOnlyList<(long StartId, long EndId)>> GetSurrogateIdRanges(string resourceType, long startId, long endId, int rangeSize, int numberOfRanges, bool up, CancellationToken cancellationToken)
{
// TODO: this code will not set capacity for the result list!

var resourceTypeId = _model.GetResourceTypeId(resourceType);
List<(long StartId, long EndId)> searchList = null;
await _sqlRetryService.ExecuteSql(
async (cancellationToken, sqlException) =>
{
using SqlConnection connection = await _sqlConnectionBuilder.GetSqlConnectionAsync(initialCatalog: null, cancellationToken: cancellationToken).ConfigureAwait(false);
using SqlCommand sqlCommand = connection.CreateCommand();
connection.RetryLogicProvider = null; // To remove this line _sqlConnectionBuilder in healthcare-shared-components must be modified.
await connection.OpenAsync(cancellationToken);

sqlCommand.CommandTimeout = GetReindexCommandTimeout();
GetResourceSurrogateIdRanges.PopulateCommand(sqlCommand, resourceTypeId, startId, endId, rangeSize, numberOfRanges, up);
LogSqlCommand(sqlCommand);

searchList = await _sqlRetryService.ExecuteSqlDataReader(
sqlCommand,
ReaderToSurrogateIdRange,
_logger,
$"{nameof(GetSurrogateIdRanges)} failed.",
cancellationToken);
return;
},
cancellationToken);
return searchList;
using var sqlCommand = new SqlCommand();
GetResourceSurrogateIdRanges.PopulateCommand(sqlCommand, resourceTypeId, startId, endId, rangeSize, numberOfRanges, up);
sqlCommand.CommandTimeout = GetReindexCommandTimeout();
LogSqlCommand(sqlCommand);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, ReaderToSurrogateIdRange, _logger, cancellationToken);
}

private static (short ResourceTypeId, string Name) ReaderGetUsedResourceTypes(SqlDataReader sqlDataReader)
Expand All @@ -703,7 +683,7 @@ private static (short ResourceTypeId, string Name) ReaderGetUsedResourceTypes(Sq
{
using var sqlCommand = new SqlCommand("dbo.GetUsedResourceTypes") { CommandType = CommandType.StoredProcedure };
LogSqlCommand(sqlCommand);
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, ReaderGetUsedResourceTypes, _logger, $"{nameof(GetUsedResourceTypes)} failed.", cancellationToken);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, ReaderGetUsedResourceTypes, _logger, cancellationToken);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,23 @@

using System;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Models;

namespace Microsoft.Health.Fhir.Core.Features.Persistence
{
public class ResourceDateKey : IEquatable<ResourceDateKey>
{
public ResourceDateKey(string resourceType, string id, long resourceSurrogateId, string versionId, bool isDeleted = false)
public ResourceDateKey(short resourceTypeId, string id, long resourceSurrogateId, string versionId, bool isDeleted = false)
{
EnsureArg.IsNotNullOrEmpty(resourceType, nameof(resourceType));
EnsureArg.IsNotNullOrEmpty(id, nameof(id));
EnsureArg.IsTrue(ModelInfoProvider.IsKnownResource(resourceType), nameof(resourceType));

ResourceType = resourceType;
ResourceTypeId = resourceTypeId;
Id = id;
ResourceSurrogateId = resourceSurrogateId;
VersionId = versionId;
IsDeleted = isDeleted;
}

public string ResourceType { get; }
public short ResourceTypeId { get; }

public string Id { get; }

Expand All @@ -46,7 +43,7 @@ public bool Equals(ResourceDateKey other)
return true;
}

return ResourceType == other.ResourceType &&
return ResourceTypeId == other.ResourceTypeId &&
Id == other.Id &&
ResourceSurrogateId == other.ResourceSurrogateId &&
VersionId == other.VersionId &&
Expand Down Expand Up @@ -75,7 +72,7 @@ public override bool Equals(object obj)

public override int GetHashCode()
{
return HashCode.Combine(ResourceType, Id, ResourceSurrogateId, VersionId, IsDeleted);
return HashCode.Combine(ResourceTypeId, Id, ResourceSurrogateId, VersionId, IsDeleted);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using Microsoft.Health.Fhir.Core.Features.Persistence;

namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
public static class ResourceWrapperExtention
{
public static ResourceDateKey ToResourceDateKey(this ResourceWrapper wrapper, bool ignoreVersion = false)
public static ResourceDateKey ToResourceDateKey(this ResourceWrapper wrapper, Func<string, short> getResourceTypeId, bool ignoreVersion = false)
{
return new ResourceDateKey(wrapper.ResourceTypeName, wrapper.ResourceId, ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(wrapper.LastModified.DateTime), ignoreVersion ? null : wrapper.Version);
return new ResourceDateKey(getResourceTypeId(wrapper.ResourceTypeName), wrapper.ResourceId, ResourceSurrogateIdHelper.LastUpdatedToResourceSurrogateId(wrapper.LastModified.DateTime), ignoreVersion ? null : wrapper.Version);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public virtual async Task CompleteJobAsync(JobInfo jobInfo, bool requestCancella

await _sqlRetryService.ExecuteSql(
sqlCommand,
async (sqlCommand, cancellationToken) =>
async (cmd, cancel) =>
{
try
{
await sqlCommand.ExecuteNonQueryAsync(cancellationToken);
await cmd.ExecuteNonQueryAsync(cancel);
}
catch (SqlException sqlEx)
{
Expand Down Expand Up @@ -156,7 +156,8 @@ public async Task<JobInfo> DequeueAsync(byte queueType, string worker, int heart
sqlCommand.Parameters.AddWithValue("@InputJobId", jobId.Value);
}

JobInfo jobInfo = await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, null, cancellationToken);
var jobInfos = await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken);
var jobInfo = jobInfos.Count == 0 ? null : jobInfos[0];
if (jobInfo != null)
{
jobInfo.QueueType = queueType;
Expand All @@ -180,7 +181,7 @@ public async Task<IReadOnlyList<JobInfo>> EnqueueAsync(byte queueType, string[]

try
{
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, null, cancellationToken);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken);
}
catch (SqlException sqlEx)
{
Expand All @@ -197,21 +198,22 @@ public async Task<IReadOnlyList<JobInfo>> GetJobByGroupIdAsync(byte queueType, l
{
using var sqlCommand = new SqlCommand();
PopulateGetJobsCommand(sqlCommand, queueType, null, null, groupId, returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReader(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByGroupIdAsync failed.", cancellationToken);
return await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobByGroupIdAsync failed.");
}

public async Task<JobInfo> GetJobByIdAsync(byte queueType, long jobId, bool returnDefinition, CancellationToken cancellationToken)
{
using var sqlCommand = new SqlCommand();
PopulateGetJobsCommand(sqlCommand, queueType, jobId, returnDefinition: returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReaderFirstRow(sqlCommand, JobInfoExtensions.LoadJobInfo, _logger, "GetJobByIdAsync failed.", cancellationToken);
var jobs = await sqlCommand.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobByIdAsync failed.");
return jobs.Count == 0 ? null : jobs[0];
}

public async Task<IReadOnlyList<JobInfo>> GetJobsByIdsAsync(byte queueType, long[] jobIds, bool returnDefinition, CancellationToken cancellationToken)
{
using var cmd = new SqlCommand();
PopulateGetJobsCommand(cmd, queueType, jobIds: jobIds, returnDefinition: returnDefinition);
return await _sqlRetryService.ExecuteSqlDataReader(cmd, JobInfoExtensions.LoadJobInfo, _logger, "GetJobsByIdsAsync failed.", cancellationToken);
return await cmd.ExecuteReaderAsync(_sqlRetryService, JobInfoExtensions.LoadJobInfo, _logger, cancellationToken, "GetJobsByIdsAsync failed.");
}

public bool IsInitialized()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
public interface ISqlRetryService
{
Task TryLogEvent(string process, string status, string text, DateTime? startDate, CancellationToken cancellationToken);

Task ExecuteSql(Func<CancellationToken, SqlException, Task> action, CancellationToken cancellationToken);

Task ExecuteSql<TLogger>(SqlCommand sqlCommand, Func<SqlCommand, CancellationToken, Task> action, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken);

Task<List<TResult>> ExecuteSqlDataReader<TResult, TLogger>(SqlCommand sqlCommand, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken);

Task<TResult> ExecuteSqlDataReaderFirstRow<TResult, TLogger>(SqlCommand sqlCommand, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken)
where TResult : class;
Task<IReadOnlyList<TResult>> ExecuteReaderAsync<TResult, TLogger>(SqlCommand sqlCommand, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, string logMessage, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Logging;

namespace Microsoft.Health.Fhir.SqlServer.Features.Storage
{
public static class SqlCommandExtensions
{
public static async Task ExecuteNonQueryAsync<TLogger>(this SqlCommand cmd, ISqlRetryService retryService, ILogger<TLogger> logger, CancellationToken cancellationToken, string logMessage = null)
{
await retryService.ExecuteSql(cmd, async (sql, cancel) => await sql.ExecuteNonQueryAsync(cancel), logger, logMessage, cancellationToken);
}

public static async Task<IReadOnlyList<TResult>> ExecuteReaderAsync<TResult, TLogger>(this SqlCommand cmd, ISqlRetryService retryService, Func<SqlDataReader, TResult> readerToResult, ILogger<TLogger> logger, CancellationToken cancellationToken, string logMessage = null)
{
return await retryService.ExecuteReaderAsync(cmd, readerToResult, logger, logMessage, cancellationToken);
}
}
}
Loading

0 comments on commit 940406b

Please # to comment.