diff --git a/src/AzurePipelines/PipelineCachingCacheClient.cs b/src/AzurePipelines/PipelineCachingCacheClient.cs index 0f7d226..e72fde5 100644 --- a/src/AzurePipelines/PipelineCachingCacheClient.cs +++ b/src/AzurePipelines/PipelineCachingCacheClient.cs @@ -87,6 +87,8 @@ internal sealed class PipelineCachingCacheClient : CacheClient private readonly DedupStoreClientWithDataport _dedupClient; private readonly DedupManifestArtifactClient _manifestClient; private readonly Task _startupTask; + private readonly KeepUntilBlobReference _keepUntil = new KeepUntilBlobReference(DateTimeOffset.Now.AddHours(4)); + private readonly HashType _hashType; public PipelineCachingCacheClient( Context rootContext, @@ -154,6 +156,7 @@ string s when int.TryParse(s, out int i) => i, _dedupHttpClient.SetRedirectTimeout(timeoutSeconds); // https://dev.azure.com/mseng/1ES/_workitems/edit/2060777 + _hashType = hasher.Info.HashType; if (hasher.Info.HashType == HashType.Dedup1024K) { _dedupHttpClient.RecommendedChunkCountPerCall = 8; @@ -220,68 +223,181 @@ protected override async Task AddNodeAsync( // If we are async publishing, then we need to grab content from the L1 and remap it. // If we are sync publishing, then we can point directly to it. - FileInfo[] infos; + PublishResult publishResult; if (EnableAsyncPublishing) { - infos = Array.Empty(); + // map the hash types + Dictionary dedupToHash = outputs.Values.ToDictionaryFirstKeyWins( + hash => hash.ToBlobIdentifier().ToDedupIdentifier(), + hash => hash); + + // open a stream to get the length of all content + Dictionary dedupToSize = new(); + foreach (ContentHash hash in dedupToHash.Values) + { + StreamWithLength? streamWithLength = await LocalCacheSession + .OpenStreamAsync(context, hash, cancellationToken) + .ThrowIfFailureAsync(r => r.StreamWithLength)!; + DedupIdentifier dedupId = hash.ToBlobIdentifier().ToDedupIdentifier(); + dedupToSize.Add(dedupId, streamWithLength.Value.Length); + } - // 2. Link out unique content to the temp folder + // create the manifest and add extras to local cache + Manifest manifest; + { + var items = new List(outputs.Count + extras.Count); - Dictionary tempFilesPerHash = outputs.Values.Distinct().ToDictionary( - hash => hash, - hash => + // put extras in local cache to simplify the code below + foreach (KeyValuePair extra in extras) { - string tempFilePath = Path.Combine(TempFolder, Guid.NewGuid().ToString("N") + ".tmp"); - tempFilePaths.Add(tempFilePath); - return tempFilePath; - }); - - List tempFiles = tempFilesPerHash - .Select(kvp => new ContentHashWithPath(kvp.Key, kvp.Value)) - .ToList(); - - Dictionary placeResults = await TryPlaceFilesFromCacheAsync( - context, - tempFiles, - realizationModeOverride: FileRealizationMode.Any, // hard links are fine for these - cancellationToken); + DedupNode node = await ChunkerHelper.CreateFromFileAsync(FileSystem.Instance, extra.Key, cancellationToken, configureAwait: false); + DedupIdentifier dedupId = node.GetDedupIdentifier(); + dedupToSize[dedupId] = extra.Value.Length; + dedupToHash[dedupId] = node.ToContentHash(_hashType); + await LocalCacheSession.PutFileAsync(context, node.ToContentHash(_hashType), new AbsolutePath(extra.Value.FullName), FileRealizationMode.Any, cancellationToken); + items.Add(new ManifestItem(extra.Key, new DedupInfo(dedupId.ValueString, node.TransitiveContentBytes))); + } + + foreach (KeyValuePair output in outputs) + { + string relativePath = output.Key.MakePathRelativeTo(RepoRoot)!.Replace("\\", "/", StringComparison.Ordinal); + DedupIdentifier dedupId = output.Value.ToBlobIdentifier().ToDedupIdentifier(); + items.Add(new ManifestItem( + relativePath, + new DedupInfo(dedupId.ValueString, (ulong)dedupToSize[dedupId]))); + } + items.Sort((i1, i2) => StringComparer.Ordinal.Compare(i1.Path, i2.Path)); - foreach (PlaceFileResult placeResult in placeResults.Values) + manifest = new Manifest(items); + } + + // Store the manifest in local cache to simplify the code below + using MemoryStream manifestStream = new(JsonSerializer.Serialize(manifest).GetUTF8Bytes()); + PutResult manifestResult = await LocalCacheSession.PutStreamAsync(context, _hashType, manifestStream, cancellationToken); + manifestResult.ThrowIfFailure(); + ContentHash manifestHash = manifestResult.ContentHash; + DedupIdentifier manifestId = manifestHash.ToBlobIdentifier().ToDedupIdentifier(); + dedupToSize[manifestId] = manifestStream.Length; + dedupToHash[manifestId] = manifestHash; + + // now that we have everything in the L1, we can efficiently ask the service what it already has + IDedupUploadSession uploadSession = _dedupClient.CreateUploadSession( + _keepUntil, + tracer: _azureDevopsTracer, + FileSystem.Instance); + + // upload whatever (outputs, extras, and manifest) is needed + Dictionary uploadCheckResults = + await uploadSession.CheckIfUploadIsNeededAsync(dedupToSize, cancellationToken); + + IEnumerable hashesToupload = uploadCheckResults + .Where(kvp => kvp.Value == CheckIfUploadNeededResult.UploadNeeded) + .Select(kvp => kvp.Key); + + var pageRoots = new List(); + // upload the files in batches of DedupNode.MaxDirectChildrenPerNode == 512 + foreach (List hashPage in hashesToupload.GetPages(DedupNode.MaxDirectChildrenPerNode)) { - placeResult.ThrowIfFailure(); + // we'll need to materialize to upload because the cache won't give us its path to the content + Dictionary tempFilesPerHash = hashPage.ToDictionary( + hash => dedupToHash[hash], + hash => + { + string tempFilePath = Path.Combine(TempFolder, Guid.NewGuid().ToString("N") + ".tmp"); + tempFilePaths.Add(tempFilePath); + return tempFilePath; + }); + + // munge to a different format + List tempFiles = tempFilesPerHash + .Select(kvp => new ContentHashWithPath(kvp.Key, kvp.Value)) + .ToList(); + + // materialize the files + Dictionary placeResults = await TryPlaceFilesFromCacheAsync( + context, + tempFiles, + realizationModeOverride: FileRealizationMode.Any, // hard links are fine for these + cancellationToken); + foreach (KeyValuePair placeResult in placeResults) + { + // Everything should already be in the L1 + placeResult.Value.ThrowIfFailure(); + } + + // compute the merkle tree + Dictionary paths = tempFilesPerHash.ToDictionary(kvp => kvp.Key.ToBlobIdentifier().ToDedupIdentifier(), kvp => kvp.Value); + var files = new List(tempFilesPerHash.Count); + foreach (KeyValuePair kvp in tempFilesPerHash) + { + // UploadAsync requires "filled" nodes. + // For single-chunk files, they are already filled as they have no children nodes. + // For multi-chunk files, we need to re-chunk them here as the LocalCAS + // only stores the hash of the top node and not the inner node tree that upload needs. + DedupIdentifier dedupId = kvp.Key.ToBlobIdentifier().ToDedupIdentifier(); + if (dedupId.AlgorithmId == ChunkDedupIdentifier.ChunkAlgorithmId) + { + files.Add(new DedupNode(new ChunkInfo(0, (uint)dedupToSize[dedupId], dedupId.AlgorithmResult))); + } + else + { + DedupNode node = await ChunkFileAsync(kvp.Value, cancellationToken); + files.Add(node); + } + } + + // create the root node and upload + var pageRootNode = new DedupNode(files); + await uploadSession.UploadAsync(pageRootNode, paths, cancellationToken); + // extras.Add(ConvertAbsolutePathToUriPath(output.Key), new FileInfo(tempFilesPerHash[output.Value])); } - // 3. map all the relative paths to the temp files - foreach (KeyValuePair output in outputs) + while (pageRoots.Count > 1) { - extras.Add(ConvertAbsolutePathToUriPath(output.Key), new FileInfo(tempFilesPerHash[output.Value])); + var newPageRoots = new List(); + foreach (List page in pageRoots.GetPages(DedupNode.MaxDirectChildrenPerNode)) + { + var pageRootNode = new DedupNode(page); + newPageRoots.Add(pageRootNode); + } + pageRoots = newPageRoots; } + + DedupNode root = pageRoots.Single(); + + HashSet proofNodes = ProofHelper.CreateProofNodes( + uploadSession.AllNodes, + uploadSession.ParentLookup, + dedupToSize.Keys); + + string[] proofNodesSerialized = proofNodes.Select(n => Convert.ToBase64String(n.Serialize())).ToArray(); + + publishResult = new PublishResult(manifestId, root.GetDedupIdentifier(), proofNodesSerialized, manifest.Items.Count, (long)root.TransitiveContentBytes); } else { - infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray(); + FileInfo[] infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray(); + publishResult = await WithHttpRetries( + () => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken), + context: $"Publishing content for {fingerprint}", + cancellationToken); } - var result = await WithHttpRetries( - () => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken), - context: $"Publishing content for {fingerprint}", - cancellationToken); - // double check { - using var manifestStream = new MemoryStream(await GetBytes(context, result.ManifestId, cancellationToken)); + using var manifestStream = new MemoryStream(await GetBytes(context, publishResult.ManifestId, cancellationToken)); Manifest manifest = JsonSerializer.Deserialize(manifestStream)!; var manifestFiles = CreateNormalizedManifest(manifest); var outputFiles = CreateNormalizedManifest(outputs); - ThrowIfDifferent(manifestFiles, outputFiles, $"With {nameof(EnableAsyncPublishing)}:{EnableAsyncPublishing}, Manifest `{result.ManifestId}` and Outputs don't match:"); + ThrowIfDifferent(manifestFiles, outputFiles, $"With {nameof(EnableAsyncPublishing)}:{EnableAsyncPublishing}, Manifest `{publishResult.ManifestId}` and Outputs don't match:"); } var key = ComputeKey(fingerprint, forWrite: true); var entry = new CreatePipelineCacheArtifactContract( new VisualStudio.Services.PipelineCache.WebApi.Fingerprint(key.Split(KeySegmentSeperator)), - result.ManifestId, - result.RootId, - result.ProofNodes, + publishResult.ManifestId, + publishResult.RootId, + publishResult.ProofNodes, ContentFormatConstants.Files); CreateResult createResult = await WithHttpRetries( @@ -382,6 +498,9 @@ protected override async Task AddNodeAsync( } } + private static Task ChunkFileAsync(string path, CancellationToken cancellationToken) => + ChunkerHelper.CreateFromFileAsync(FileSystem.Instance, path, cancellationToken, configureAwait: false); + private static byte GetAlgorithmId(ContentHash hash) { switch (hash._hashType)