Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Remove +1 assumption for input versions in $import #3685

Merged
merged 8 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

5,440 changes: 5,440 additions & 0 deletions src/Microsoft.Health.Fhir.SqlServer/Features/Schema/Migrations/74.sql

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,6 @@ public enum SchemaVersion
V71 = 71,
V72 = 72,
V73 = 73,
V74 = 74,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Microsoft.Health.Fhir.SqlServer.Features.Schema
public static class SchemaVersionConstants
{
public const int Min = (int)SchemaVersion.V69;
public const int Max = (int)SchemaVersion.V73;
public const int Max = (int)SchemaVersion.V74;
public const int MinForUpgrade = (int)SchemaVersion.V69; // this is used for upgrade tests only
public const int SearchParameterStatusSchemaVersion = (int)SchemaVersion.V6;
public const int SupportForReferencesWithMissingTypeVersion = (int)SchemaVersion.V7;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ Go

INSERT INTO dbo.SchemaVersion
VALUES
(73, 'started')
(74, 'started')

Go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ BEGIN TRY
ON B.ResourceTypeId = A.ResourceTypeId AND B.ResourceId = A.ResourceId AND B.IsHistory = 0
OPTION (MAXDOP 1, OPTIMIZE FOR (@DummyTop = 1))

IF @RaiseExceptionOnConflict = 1 AND EXISTS (SELECT * FROM @ResourceInfos WHERE PreviousVersion IS NOT NULL AND Version <> PreviousVersion + 1)
IF @RaiseExceptionOnConflict = 1 AND EXISTS (SELECT * FROM @ResourceInfos WHERE PreviousVersion IS NOT NULL AND Version <= PreviousVersion)
THROW 50409, 'Resource has been recently updated or added, please compare the resource content in code for any duplicate updates', 1

INSERT INTO @PreviousSurrogateIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation
(var transactionId, var minSequenceId) = await StoreClient.MergeResourcesBeginTransactionAsync(resources.Count, cancellationToken);

var index = 0;
var mergeWrappers = new List<MergeResourceWrapper>();
var mergeWrappersWithVersions = new List<(MergeResourceWrapper Wrapper, bool KeepVersion, int ResourceVersion, int? ExistingVersion)>();
var prevResourceId = string.Empty;
var singleTransaction = enlistInTransaction;
foreach (var resourceExt in resources) // if list contains more that one version per resource it must be sorted by id and last updated desc.
foreach (var resourceExt in resources) // if list contains more that one version per resource it must be sorted by id and last updated DESC.
{
var setAsHistory = prevResourceId == resourceExt.Wrapper.ResourceId;
var setAsHistory = prevResourceId == resourceExt.Wrapper.ResourceId; // this assumes that first resource version is the latest one
prevResourceId = resourceExt.Wrapper.ResourceId;
var weakETag = resourceExt.WeakETag;
int? eTag = weakETag == null
Expand All @@ -178,9 +178,9 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation

var resource = resourceExt.Wrapper;
var identifier = resourceExt.GetIdentifier();
var resourceKey = resource.ToResourceKey(); // keep input version in the results to allow processing multiple versions per resource
existingResources.TryGetValue(resource.ToResourceKey(true), out var existingResource);
var hasVersionToCompare = false;
var existingVersion = 0;

// Check for any validation errors
if (existingResource != null && eTag.HasValue && !string.Equals(eTag.ToString(), existingResource.Version, StringComparison.Ordinal))
Expand Down Expand Up @@ -271,7 +271,7 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation
}
}

var existingVersion = int.Parse(existingResource.Version);
existingVersion = int.Parse(existingResource.Version);
var versionPlusOne = (existingVersion + 1).ToString(CultureInfo.InvariantCulture);
if (!resourceExt.KeepVersion) // version is set on input
{
Expand Down Expand Up @@ -310,12 +310,34 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation
singleTransaction = true;
}

mergeWrappers.Add(new MergeResourceWrapper(resource, resourceExt.KeepHistory, hasVersionToCompare));
mergeWrappersWithVersions.Add((new MergeResourceWrapper(resource, resourceExt.KeepHistory, hasVersionToCompare), resourceExt.KeepVersion, int.Parse(resource.Version), existingVersion));
index++;
results.Add(identifier, new DataStoreOperationOutcome(new UpsertOutcome(resource, resource.Version == InitialVersion ? SaveOutcomeType.Created : SaveOutcomeType.Updated)));
}

if (mergeWrappers.Count > 0) // Do not call DB with empty input
// Resources with input versions (keepVersion=true) might not have hasVersionToCompare set. Fix it here.
// Resources with keepVersion=true must be in separate call, and not mixed with keepVersion=false ones.
// Sort them in groups by resource id and order by version.
Copy link
Member

@brendankowitz brendankowitz Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be done with Linq to simplify? Something like:

mergeWrappersPlus
.Where(x => x.KeepVersion && x.ExistingVersion != 0)
.GroupBy(key => key.Wrapper.ResourceWrapper.ResourceId, items => items.OrderBy(y =>y.ResourceVersion))
``` #ByDesign

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not able to find a clear way to apply required logic only using Linq. So I did what I did. I hope it is obvious, hence simple.

// In each group find the smallest version higher then existing
prevResourceId = string.Empty;
var notSetInResoureGroup = false;
foreach (var mergeWrapper in mergeWrappersWithVersions.Where(x => x.KeepVersion && x.ExistingVersion != 0).OrderBy(x => x.Wrapper.ResourceWrapper.ResourceId).ThenBy(x => x.ResourceVersion))
{
if (prevResourceId != mergeWrapper.Wrapper.ResourceWrapper.ResourceId) // this should reset flag on each resource id group including first.
{
notSetInResoureGroup = true;
}

prevResourceId = mergeWrapper.Wrapper.ResourceWrapper.ResourceId;

if (notSetInResoureGroup && mergeWrapper.ResourceVersion > mergeWrapper.ExistingVersion)
{
mergeWrapper.Wrapper.HasVersionToCompare = true;
notSetInResoureGroup = false;
}
}

if (mergeWrappersWithVersions.Count > 0) // Do not call DB with empty input
{
await using (new Timer(async _ => await _sqlStoreClient.MergeResourcesPutTransactionHeartbeatAsync(transactionId, MergeResourcesTransactionHeartbeatPeriod, cancellationToken), null, TimeSpan.FromSeconds(RandomNumberGenerator.GetInt32(100) / 100.0 * MergeResourcesTransactionHeartbeatPeriod.TotalSeconds), MergeResourcesTransactionHeartbeatPeriod))
{
Expand All @@ -325,7 +347,7 @@ internal async Task<IDictionary<DataStoreOperationIdentifier, DataStoreOperation
{
try
{
await MergeResourcesWrapperAsync(transactionId, singleTransaction, mergeWrappers, enlistInTransaction, timeoutRetries, cancellationToken);
await MergeResourcesWrapperAsync(transactionId, singleTransaction, mergeWrappersWithVersions.Select(_ => _.Wrapper).ToList(), enlistInTransaction, timeoutRetries, cancellationToken);
break;
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ internal MergeResourceWrapper(ResourceWrapper resourceWrapper, bool keepHistory,
/// <summary>
/// Flag indicating whether version in resource wrapper == (existing version in the database + 1)
/// </summary>
public bool HasVersionToCompare { get; private set; }
public bool HasVersionToCompare { get; internal set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<!-- Properties used by sql task to generate full script -->
<PropertyGroup>
<LatestSchemaVersion>73</LatestSchemaVersion>
<LatestSchemaVersion>74</LatestSchemaVersion>
<GeneratedFullScriptPath>Features\Schema\Migrations\$(LatestSchemaVersion).sql</GeneratedFullScriptPath>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,41 @@ public async Task GivenIncrementalLoad_MultipleInputVersionsOutOfOrderSomeNotExp
Assert.Equal(GetLastUpdated("2002"), result.Resource.Meta.LastUpdated);
}

[Fact]
public async Task GivenIncrementalLoad_MultipleNonSequentialInputVersions_ResourceExisting()
{
var id = Guid.NewGuid().ToString("N");

// set existing
var ndJson = PrepareResource(id, "10000", "2000");
var location = (await ImportTestHelper.UploadFileAsync(ndJson, _fixture.StorageAccount)).location;
var request = CreateImportRequest(location, ImportMode.IncrementalLoad);
await ImportCheckAsync(request, null);

// set input. something before and something after existing
ndJson = PrepareResource(id, "9000", "1999");
var ndJson2 = PrepareResource(id, "10100", "2001");
var ndJson3 = PrepareResource(id, "10300", "2003");

// note order of records
location = (await ImportTestHelper.UploadFileAsync(ndJson2 + ndJson + ndJson3, _fixture.StorageAccount)).location;
request = CreateImportRequest(location, ImportMode.IncrementalLoad);
await ImportCheckAsync(request, null);

// check current
var result = await _client.ReadAsync<Patient>(ResourceType.Patient, id);
Assert.Equal("10300", result.Resource.Meta.VersionId);
Assert.Equal(GetLastUpdated("2003"), result.Resource.Meta.LastUpdated);

// check history
result = await _client.VReadAsync<Patient>(ResourceType.Patient, id, "9000");
Assert.Equal(GetLastUpdated("1999"), result.Resource.Meta.LastUpdated);
result = await _client.VReadAsync<Patient>(ResourceType.Patient, id, "10100");
Assert.Equal(GetLastUpdated("2001"), result.Resource.Meta.LastUpdated);
result = await _client.VReadAsync<Patient>(ResourceType.Patient, id, "10000");
Assert.Equal(GetLastUpdated("2000"), result.Resource.Meta.LastUpdated);
}

[Fact]
public async Task GivenIncrementalLoad_MultipleInputVersions_ResourceExisting_VersionConflict()
{
Expand Down
2 changes: 2 additions & 0 deletions tools/BlobRewriter/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@
<add key="SplitBySize" value="true" />
<!-- Filter on blob names. If empoty string - no filtering. -->
<add key="NameFilter" value="" />
<!-- If true tries to replace {"resourceType": by {"meta":{"versionId":"seconds from last updated","lastUpdated":"YYYY-MM-DDThh:mm:ss"},"resourceType": -->
<add key="AddMeta" value="false" />
</appSettings>
</configuration>
17 changes: 14 additions & 3 deletions tools/BlobRewriter/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static class Program
private static readonly bool WritesEnabled = bool.Parse(ConfigurationManager.AppSettings["WritesEnabled"]);
private static readonly bool SplitBySize = bool.Parse(ConfigurationManager.AppSettings["SplitBySize"]);
private static readonly string NameFilter = ConfigurationManager.AppSettings["NameFilter"];
private static readonly bool AddMeta = bool.Parse(ConfigurationManager.AppSettings["AddMeta"]);

public static void Main()
{
Expand Down Expand Up @@ -60,7 +61,7 @@ public static void Main()
var blob = blobInt.Item2.First();

var lines = SplitBySize
? LinesPerBlob == 0 ? CopyBlob(sourceContainer, blob.Name, targetContainer, ref targetBlobs) : SplitBlobBySize(sourceContainer, blob.Name, targetContainer, ref targetBlobs)
? LinesPerBlob == 0 ? CopyBlob(sourceContainer, blob.Name, targetContainer, ref targetBlobs, blobIndex) : SplitBlobBySize(sourceContainer, blob.Name, targetContainer, ref targetBlobs)
: SplitBlobByResourceId(sourceContainer, blob.Name, targetContainer, ref targetBlobs);
Interlocked.Add(ref totalLines, lines);
Interlocked.Increment(ref sourceBlobs);
Expand Down Expand Up @@ -148,7 +149,7 @@ private static long SplitBlobBySize(BlobContainerClient sourceContainer, string
return lines;
}

private static long CopyBlob(BlobContainerClient sourceContainer, string blobName, BlobContainerClient targetContainer, ref long targetBlobs)
private static long CopyBlob(BlobContainerClient sourceContainer, string blobName, BlobContainerClient targetContainer, ref long targetBlobs, int blobIndex)
{
var lines = 0L;
using var stream = targetContainer.GetBlockBlobClient(blobName).OpenWrite(true);
Expand All @@ -158,7 +159,17 @@ private static long CopyBlob(BlobContainerClient sourceContainer, string blobNam
lines++;
if (WritesEnabled)
{
writer.WriteLine(line);
if (AddMeta)
{
var date = DateTime.UtcNow.AddMinutes(-SourceBlobs).AddMinutes(blobIndex).AddMilliseconds(lines);
var seconds = ((int)(date - DateTime.Parse("1970-01-01")).TotalSeconds).ToString();
var lineWithMeta = line.Replace("{\"resourceType\":", "{\"meta\":{\"versionId\":\"" + seconds + "\",\"lastUpdated\":\"" + date.ToString("yyyy-MM-ddTHH:mm:ss.fff") + "\"},\"resourceType\":", StringComparison.OrdinalIgnoreCase);
writer.WriteLine(lineWithMeta);
}
else
{
writer.WriteLine(line);
}
}
}

Expand Down