From 2c809e4d5f2462c49ddf5eddbc30428f5231d049 Mon Sep 17 00:00:00 2001 From: SergeyGaluzo <95932081+SergeyGaluzo@users.noreply.github.com> Date: Thu, 8 Jun 2023 12:35:43 -0700 Subject: [PATCH] Add keep last updated parameter to merge (#3337) * Add keep last updated parameter to merge * ToList() * comment --- .../Import/ImportResourceLoaderTests.cs | 2 +- .../Operations/Import/ImportResource.cs | 11 ++++++-- .../Operations/Import/ImportResourceParser.cs | 6 ++--- .../Import/ImportProcessingJobTests.cs | 2 +- .../Features/Operations/Import/SqlImporter.cs | 26 ++++++++++--------- .../Storage/SqlServerFhirDataStore.cs | 6 ++--- 6 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportResourceLoaderTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportResourceLoaderTests.cs index 2fce4d4deb..be81cffdb7 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportResourceLoaderTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Import/ImportResourceLoaderTests.cs @@ -243,7 +243,7 @@ private async Task VerifyResourceLoaderAsync(int resourcCount, int batchSize, lo null, null, "SearchParam"); - return new ImportResource(index, 0, 0, false, resourceWrapper); + return new ImportResource(index, 0, 0, false, false, resourceWrapper); }); IImportErrorSerializer serializer = Substitute.For(); diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportResource.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportResource.cs index 6f61436a0c..83078344bc 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportResource.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Import/ImportResource.cs @@ -9,17 +9,18 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Import { public class ImportResource { - public ImportResource(long index, long offset, int length, bool keepVersion, ResourceWrapper resourceWrapper) + public ImportResource(long index, long offset, int length, bool keepLastUpdated, bool keepVersion, ResourceWrapper resourceWrapper) { Index = index; Offset = offset; Length = length; + KeepLastUpdated = keepLastUpdated; KeepVersion = keepVersion; ResourceWrapper = resourceWrapper; } public ImportResource(ResourceWrapper resource) - : this(0, 0, 0, false, resource) + : this(0, 0, 0, false, false, resource) { } @@ -28,6 +29,7 @@ public ImportResource(long index, long offset, string importError) Index = index; Offset = offset; Length = 0; + KeepLastUpdated = false; KeepVersion = false; ImportError = importError; } @@ -47,6 +49,11 @@ public ImportResource(long index, long offset, string importError) /// public int Length { get; set; } + /// + /// Flag indicating whether latUpdated was provided on input + /// + public bool KeepLastUpdated { get; set; } + /// /// Flag indicating whether version was provided on input /// diff --git a/src/Microsoft.Health.Fhir.Shared.Core/Features/Operations/Import/ImportResourceParser.cs b/src/Microsoft.Health.Fhir.Shared.Core/Features/Operations/Import/ImportResourceParser.cs index 4ba871b649..27a23d17db 100644 --- a/src/Microsoft.Health.Fhir.Shared.Core/Features/Operations/Import/ImportResourceParser.cs +++ b/src/Microsoft.Health.Fhir.Shared.Core/Features/Operations/Import/ImportResourceParser.cs @@ -36,8 +36,8 @@ public ImportResource Parse(long index, long offset, int length, string rawResou resource.Meta = new Meta(); } - bool lastUpdatedIsNull; - if (lastUpdatedIsNull = importMode == ImportMode.InitialLoad || resource.Meta.LastUpdated == null) + var lastUpdatedIsNull = importMode == ImportMode.InitialLoad || resource.Meta.LastUpdated == null; + if (lastUpdatedIsNull) { resource.Meta.LastUpdated = Clock.UtcNow; } @@ -52,7 +52,7 @@ public ImportResource Parse(long index, long offset, int length, string rawResou var resourceElement = resource.ToResourceElement(); var resourceWapper = _resourceFactory.Create(resourceElement, false, true, keepVersion); - return new ImportResource(index, offset, length, keepVersion, resourceWapper); + return new ImportResource(index, offset, length, !lastUpdatedIsNull, keepVersion, resourceWapper); } private static void CheckConditionalReferenceInResource(Resource resource) diff --git a/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs b/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs index 940a67eb4f..add95eb58e 100644 --- a/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs +++ b/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs @@ -152,7 +152,7 @@ private static async Task VerifyCommonImportAsync(ImportProcessingJobDefinition null, "SearchParam"); - await resourceChannel.Writer.WriteAsync(new ImportResource(0, 0, 0, false, resourceWrapper)); + await resourceChannel.Writer.WriteAsync(new ImportResource(0, 0, 0, false, false, resourceWrapper)); await resourceChannel.Writer.WriteAsync(new ImportResource(1, 0, "Error")); resourceChannel.Writer.Complete(); }); diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs index 6ee29a60b2..02d727ca62 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs @@ -100,7 +100,7 @@ private void ImportResourcesInBuffer(List resources, List(); conflicts = new List(); - ImportResourcesInBufferMain(resources, loaded, conflicts, importMode, timeoutRetries, cancellationToken).Wait(); + ImportResourcesInBufferInternal(resources, loaded, conflicts, importMode, timeoutRetries, cancellationToken).Wait(); break; } catch (Exception e) @@ -112,18 +112,18 @@ private void ImportResourcesInBuffer(List resources, List resources, List resources, List loaded, List conflicts, ImportMode importMode, int timeoutRetries, CancellationToken cancellationToken) + private async Task ImportResourcesInBufferInternal(List resources, List loaded, List conflicts, ImportMode importMode, int timeoutRetries, CancellationToken cancellationToken) { var goodResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList(); if (importMode == ImportMode.InitialLoad) @@ -195,17 +195,19 @@ private async Task ImportResourcesInBufferMain(List resources, L } } - var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts); // some resources might get version assigned - await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), timeoutRetries, cancellationToken); - await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), timeoutRetries, cancellationToken); + var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts).ToList(); // some resources might get version assigned + await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion).ToList(), timeoutRetries, cancellationToken); + await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion).ToList(), timeoutRetries, cancellationToken); loaded.AddRange(inputDeduppedNoVersionNoConflict); } } - private async Task MergeResourcesAsync(IEnumerable resources, int timeoutRetries, CancellationToken cancellationToken) + private async Task MergeResourcesAsync(IList resources, int timeoutRetries, CancellationToken cancellationToken) { - var input = resources.Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList(); - await _store.MergeInternalAsync(input, timeoutRetries, cancellationToken); + var input = resources.Where(_ => _.KeepLastUpdated).Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList(); + await _store.MergeInternalAsync(input, true, timeoutRetries, cancellationToken); + input = resources.Where(_ => !_.KeepLastUpdated).Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList(); + await _store.MergeInternalAsync(input, false, timeoutRetries, cancellationToken); } private void AppendErrorsToBuffer(IEnumerable dups, IEnumerable conflicts, List importErrorBuffer) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index f78832c606..d6d63ab8c3 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -114,7 +114,7 @@ public async Task> MergeInternalAsync(IReadOnlyList resources, int timeoutRetries, CancellationToken cancellationToken) + internal async Task> MergeInternalAsync(IReadOnlyList resources, bool keepLastUpdated, int timeoutRetries, CancellationToken cancellationToken) { var results = new Dictionary(); if (resources == null || resources.Count == 0) @@ -286,7 +286,7 @@ internal async Task