Skip to content

Commit

Permalink
Add keep last updated parameter to merge (#3337)
Browse files Browse the repository at this point in the history
* Add keep last updated parameter to merge

* ToList()

* comment
  • Loading branch information
SergeyGaluzo authored Jun 8, 2023
1 parent 06d2731 commit 2c809e4
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private async Task VerifyResourceLoaderAsync(int resourcCount, int batchSize, lo
null,
null,
"SearchParam");
return new ImportResource(index, 0, 0, false, resourceWrapper);
return new ImportResource(index, 0, 0, false, false, resourceWrapper);
});

IImportErrorSerializer serializer = Substitute.For<IImportErrorSerializer>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ namespace Microsoft.Health.Fhir.Core.Features.Operations.Import
{
public class ImportResource
{
public ImportResource(long index, long offset, int length, bool keepVersion, ResourceWrapper resourceWrapper)
public ImportResource(long index, long offset, int length, bool keepLastUpdated, bool keepVersion, ResourceWrapper resourceWrapper)
{
Index = index;
Offset = offset;
Length = length;
KeepLastUpdated = keepLastUpdated;
KeepVersion = keepVersion;
ResourceWrapper = resourceWrapper;
}

public ImportResource(ResourceWrapper resource)
: this(0, 0, 0, false, resource)
: this(0, 0, 0, false, false, resource)
{
}

Expand All @@ -28,6 +29,7 @@ public ImportResource(long index, long offset, string importError)
Index = index;
Offset = offset;
Length = 0;
KeepLastUpdated = false;
KeepVersion = false;
ImportError = importError;
}
Expand All @@ -47,6 +49,11 @@ public ImportResource(long index, long offset, string importError)
/// </summary>
public int Length { get; set; }

/// <summary>
/// Flag indicating whether latUpdated was provided on input
/// </summary>
public bool KeepLastUpdated { get; set; }

/// <summary>
/// Flag indicating whether version was provided on input
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public ImportResource Parse(long index, long offset, int length, string rawResou
resource.Meta = new Meta();
}

bool lastUpdatedIsNull;
if (lastUpdatedIsNull = importMode == ImportMode.InitialLoad || resource.Meta.LastUpdated == null)
var lastUpdatedIsNull = importMode == ImportMode.InitialLoad || resource.Meta.LastUpdated == null;
if (lastUpdatedIsNull)
{
resource.Meta.LastUpdated = Clock.UtcNow;
}
Expand All @@ -52,7 +52,7 @@ public ImportResource Parse(long index, long offset, int length, string rawResou
var resourceElement = resource.ToResourceElement();
var resourceWapper = _resourceFactory.Create(resourceElement, false, true, keepVersion);

return new ImportResource(index, offset, length, keepVersion, resourceWapper);
return new ImportResource(index, offset, length, !lastUpdatedIsNull, keepVersion, resourceWapper);
}

private static void CheckConditionalReferenceInResource(Resource resource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private static async Task VerifyCommonImportAsync(ImportProcessingJobDefinition
null,
"SearchParam");

await resourceChannel.Writer.WriteAsync(new ImportResource(0, 0, 0, false, resourceWrapper));
await resourceChannel.Writer.WriteAsync(new ImportResource(0, 0, 0, false, false, resourceWrapper));
await resourceChannel.Writer.WriteAsync(new ImportResource(1, 0, "Error"));
resourceChannel.Writer.Complete();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
mergeStart = DateTime.UtcNow;
loaded = new List<ImportResource>();
conflicts = new List<ImportResource>();
ImportResourcesInBufferMain(resources, loaded, conflicts, importMode, timeoutRetries, cancellationToken).Wait();
ImportResourcesInBufferInternal(resources, loaded, conflicts, importMode, timeoutRetries, cancellationToken).Wait();
break;
}
catch (Exception e)
Expand All @@ -112,18 +112,18 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
|| (isRetriable = e.IsRetriable()) // this should allow to deal with intermittent database errors.
|| ((isExecutionTimeout = e.IsExecutionTimeout()) && timeoutRetries++ < 3)) // timeouts happen once in a while on highly loaded databases.
{
_logger.LogWarning(e, $"Error on {nameof(ImportResourcesInBufferMain)} retries={{Retries}} timeoutRetries={{TimeoutRetries}}", retries, timeoutRetries);
_logger.LogWarning(e, $"Error on {nameof(ImportResourcesInBufferInternal)} retries={{Retries}} timeoutRetries={{TimeoutRetries}}", retries, timeoutRetries);
if (isRetriable || isExecutionTimeout) // others are logged in SQL by merge stored procedure
{
_store.TryLogEvent(nameof(ImportResourcesInBufferMain), "Warn", $"retries={retries} timeoutRetries={timeoutRetries} error={e}", mergeStart, cancellationToken).Wait();
_store.TryLogEvent(nameof(ImportResourcesInBufferInternal), "Warn", $"retries={retries} timeoutRetries={timeoutRetries} error={e}", mergeStart, cancellationToken).Wait();
}

Task.Delay(5000, cancellationToken);
continue;
}

_logger.LogError(e, $"Error on {nameof(ImportResourcesInBufferMain)} retries={{Retries}} timeoutRetries={{TimeoutRetries}}", retries, timeoutRetries);
_store.TryLogEvent(nameof(ImportResourcesInBufferMain), "Error", $"retries={retries} timeoutRetries={timeoutRetries} error={e}", mergeStart, cancellationToken).Wait();
_logger.LogError(e, $"Error on {nameof(ImportResourcesInBufferInternal)} retries={{Retries}} timeoutRetries={{TimeoutRetries}}", retries, timeoutRetries);
_store.TryLogEvent(nameof(ImportResourcesInBufferInternal), "Error", $"retries={retries} timeoutRetries={timeoutRetries} error={e}", mergeStart, cancellationToken).Wait();

throw;
}
Expand All @@ -141,7 +141,7 @@ private void ImportResourcesInBuffer(List<ImportResource> resources, List<string
resources.Clear();
}

private async Task ImportResourcesInBufferMain(List<ImportResource> resources, List<ImportResource> loaded, List<ImportResource> conflicts, ImportMode importMode, int timeoutRetries, CancellationToken cancellationToken)
private async Task ImportResourcesInBufferInternal(List<ImportResource> resources, List<ImportResource> loaded, List<ImportResource> conflicts, ImportMode importMode, int timeoutRetries, CancellationToken cancellationToken)
{
var goodResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList();
if (importMode == ImportMode.InitialLoad)
Expand Down Expand Up @@ -195,17 +195,19 @@ private async Task ImportResourcesInBufferMain(List<ImportResource> resources, L
}
}

var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts); // some resources might get version assigned
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion), timeoutRetries, cancellationToken);
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion), timeoutRetries, cancellationToken);
var inputDeduppedNoVersionNoConflict = inputDeduppedNoVersion.Except(conflicts).ToList(); // some resources might get version assigned
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => _.KeepVersion).ToList(), timeoutRetries, cancellationToken);
await MergeResourcesAsync(inputDeduppedNoVersionNoConflict.Where(_ => !_.KeepVersion).ToList(), timeoutRetries, cancellationToken);
loaded.AddRange(inputDeduppedNoVersionNoConflict);
}
}

private async Task MergeResourcesAsync(IEnumerable<ImportResource> resources, int timeoutRetries, CancellationToken cancellationToken)
private async Task MergeResourcesAsync(IList<ImportResource> resources, int timeoutRetries, CancellationToken cancellationToken)
{
var input = resources.Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList();
await _store.MergeInternalAsync(input, timeoutRetries, cancellationToken);
var input = resources.Where(_ => _.KeepLastUpdated).Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList();
await _store.MergeInternalAsync(input, true, timeoutRetries, cancellationToken);
input = resources.Where(_ => !_.KeepLastUpdated).Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleOperationId: null)).ToList();
await _store.MergeInternalAsync(input, false, timeoutRetries, cancellationToken);
}

private void AppendErrorsToBuffer(IEnumerable<ImportResource> dups, IEnumerable<ImportResource> conflicts, List<string> importErrorBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOu
try
{
mergeStart = DateTime.UtcNow;
var results = await MergeInternalAsync(resources, 0, cancellationToken); // TODO: Pass correct retries value once we start supporting retries
var results = await MergeInternalAsync(resources, false, 0, cancellationToken); // TODO: Pass correct retries value once we start supporting retries
return results;
}
catch (Exception e)
Expand Down Expand Up @@ -148,7 +148,7 @@ public async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOu
}

// Split in a separate method to allow special logic in $import.
internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>> MergeInternalAsync(IReadOnlyList<ResourceWrapperOperation> resources, int timeoutRetries, CancellationToken cancellationToken)
internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>> MergeInternalAsync(IReadOnlyList<ResourceWrapperOperation> resources, bool keepLastUpdated, int timeoutRetries, CancellationToken cancellationToken)
{
var results = new Dictionary<DataStoreOperationIdentifier, DataStoreOperationOutcome>();
if (resources == null || resources.Count == 0)
Expand Down Expand Up @@ -286,7 +286,7 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation
}

long surrId;
if (_ignoreInputLastUpdated.IsEnabled())
if (!keepLastUpdated || _ignoreInputLastUpdated.IsEnabled())
{
surrId = minSurrId + index;
resource.LastModified = new DateTimeOffset(ResourceSurrogateIdHelper.ResourceSurrogateIdToLastUpdated(surrId), TimeSpan.Zero);
Expand Down

0 comments on commit 2c809e4

Please # to comment.