Skip to content

Only link what needs to be uploaded #48

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 155 additions & 36 deletions src/AzurePipelines/PipelineCachingCacheClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ internal sealed class PipelineCachingCacheClient : CacheClient
private readonly DedupStoreClientWithDataport _dedupClient;
private readonly DedupManifestArtifactClient _manifestClient;
private readonly Task _startupTask;
private readonly KeepUntilBlobReference _keepUntil = new KeepUntilBlobReference(DateTimeOffset.Now.AddHours(4));
private readonly HashType _hashType;

public PipelineCachingCacheClient(
Context rootContext,
Expand Down Expand Up @@ -154,6 +156,7 @@ string s when int.TryParse(s, out int i) => i,
_dedupHttpClient.SetRedirectTimeout(timeoutSeconds);

// https://dev.azure.com/mseng/1ES/_workitems/edit/2060777
_hashType = hasher.Info.HashType;
if (hasher.Info.HashType == HashType.Dedup1024K)
{
_dedupHttpClient.RecommendedChunkCountPerCall = 8;
Expand Down Expand Up @@ -220,68 +223,181 @@ protected override async Task AddNodeAsync(

// If we are async publishing, then we need to grab content from the L1 and remap it.
// If we are sync publishing, then we can point directly to it.
FileInfo[] infos;
PublishResult publishResult;
if (EnableAsyncPublishing)
{
infos = Array.Empty<FileInfo>();
// map the hash types
Dictionary<DedupIdentifier, ContentHash> dedupToHash = outputs.Values.ToDictionaryFirstKeyWins(
hash => hash.ToBlobIdentifier().ToDedupIdentifier(),
hash => hash);

// open a stream to get the length of all content
Dictionary<DedupIdentifier, long> dedupToSize = new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: initialize with a count

foreach (ContentHash hash in dedupToHash.Values)
{
StreamWithLength? streamWithLength = await LocalCacheSession
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to using so it gets disposed.

.OpenStreamAsync(context, hash, cancellationToken)
.ThrowIfFailureAsync(r => r.StreamWithLength)!;
DedupIdentifier dedupId = hash.ToBlobIdentifier().ToDedupIdentifier();
dedupToSize.Add(dedupId, streamWithLength.Value.Length);
}

// 2. Link out unique content to the temp folder
// create the manifest and add extras to local cache
Manifest manifest;
{
var items = new List<ManifestItem>(outputs.Count + extras.Count);

Dictionary<ContentHash, string> tempFilesPerHash = outputs.Values.Distinct().ToDictionary(
hash => hash,
hash =>
// put extras in local cache to simplify the code below
foreach (KeyValuePair<string, FileInfo> extra in extras)
{
string tempFilePath = Path.Combine(TempFolder, Guid.NewGuid().ToString("N") + ".tmp");
tempFilePaths.Add(tempFilePath);
return tempFilePath;
});

List<ContentHashWithPath> tempFiles = tempFilesPerHash
.Select(kvp => new ContentHashWithPath(kvp.Key, kvp.Value))
.ToList();

Dictionary<string, PlaceFileResult> placeResults = await TryPlaceFilesFromCacheAsync(
context,
tempFiles,
realizationModeOverride: FileRealizationMode.Any, // hard links are fine for these
cancellationToken);
DedupNode node = await ChunkerHelper.CreateFromFileAsync(FileSystem.Instance, extra.Key, cancellationToken, configureAwait: false);
DedupIdentifier dedupId = node.GetDedupIdentifier();
dedupToSize[dedupId] = extra.Value.Length;
dedupToHash[dedupId] = node.ToContentHash(_hashType);
await LocalCacheSession.PutFileAsync(context, node.ToContentHash(_hashType), new AbsolutePath(extra.Value.FullName), FileRealizationMode.Any, cancellationToken);
items.Add(new ManifestItem(extra.Key, new DedupInfo(dedupId.ValueString, node.TransitiveContentBytes)));
}

foreach (KeyValuePair<string, ContentHash> output in outputs)
{
string relativePath = output.Key.MakePathRelativeTo(RepoRoot)!.Replace("\\", "/", StringComparison.Ordinal);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should merge with the latest and use the helper

DedupIdentifier dedupId = output.Value.ToBlobIdentifier().ToDedupIdentifier();
items.Add(new ManifestItem(
relativePath,
new DedupInfo(dedupId.ValueString, (ulong)dedupToSize[dedupId])));
}
items.Sort((i1, i2) => StringComparer.Ordinal.Compare(i1.Path, i2.Path));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be case-sensitive or insensitive?


foreach (PlaceFileResult placeResult in placeResults.Values)
manifest = new Manifest(items);
}

// Store the manifest in local cache to simplify the code below
using MemoryStream manifestStream = new(JsonSerializer.Serialize(manifest).GetUTF8Bytes());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you can serialize directly to a stream, which should cut out the intermediate string.

PutResult manifestResult = await LocalCacheSession.PutStreamAsync(context, _hashType, manifestStream, cancellationToken);
manifestResult.ThrowIfFailure();
ContentHash manifestHash = manifestResult.ContentHash;
DedupIdentifier manifestId = manifestHash.ToBlobIdentifier().ToDedupIdentifier();
dedupToSize[manifestId] = manifestStream.Length;
dedupToHash[manifestId] = manifestHash;

// now that we have everything in the L1, we can efficiently ask the service what it already has
IDedupUploadSession uploadSession = _dedupClient.CreateUploadSession(
_keepUntil,
tracer: _azureDevopsTracer,
FileSystem.Instance);

// upload whatever (outputs, extras, and manifest) is needed
Dictionary<DedupIdentifier, CheckIfUploadNeededResult> uploadCheckResults =
await uploadSession.CheckIfUploadIsNeededAsync(dedupToSize, cancellationToken);

IEnumerable<DedupIdentifier> hashesToupload = uploadCheckResults
Copy link
Member

@dfederm dfederm Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: hashesToupload -> hashesToUpload

.Where(kvp => kvp.Value == CheckIfUploadNeededResult.UploadNeeded)
.Select(kvp => kvp.Key);

var pageRoots = new List<DedupNode>();
// upload the files in batches of DedupNode.MaxDirectChildrenPerNode == 512
foreach (List<DedupIdentifier> hashPage in hashesToupload.GetPages(DedupNode.MaxDirectChildrenPerNode))
{
placeResult.ThrowIfFailure();
// we'll need to materialize to upload because the cache won't give us its path to the content
Dictionary<ContentHash, string> tempFilesPerHash = hashPage.ToDictionary(
hash => dedupToHash[hash],
hash =>
{
string tempFilePath = Path.Combine(TempFolder, Guid.NewGuid().ToString("N") + ".tmp");
tempFilePaths.Add(tempFilePath);
return tempFilePath;
});

// munge to a different format
List<ContentHashWithPath> tempFiles = tempFilesPerHash
.Select(kvp => new ContentHashWithPath(kvp.Key, kvp.Value))
.ToList();

// materialize the files
Dictionary<string, PlaceFileResult> placeResults = await TryPlaceFilesFromCacheAsync(
context,
tempFiles,
realizationModeOverride: FileRealizationMode.Any, // hard links are fine for these
cancellationToken);
foreach (KeyValuePair<string, PlaceFileResult> placeResult in placeResults)
{
// Everything should already be in the L1
placeResult.Value.ThrowIfFailure();
}

// compute the merkle tree
Dictionary<DedupIdentifier, string> paths = tempFilesPerHash.ToDictionary(kvp => kvp.Key.ToBlobIdentifier().ToDedupIdentifier(), kvp => kvp.Value);
var files = new List<DedupNode>(tempFilesPerHash.Count);
foreach (KeyValuePair<ContentHash, string> kvp in tempFilesPerHash)
{
// UploadAsync requires "filled" nodes.
// For single-chunk files, they are already filled as they have no children nodes.
// For multi-chunk files, we need to re-chunk them here as the LocalCAS
// only stores the hash of the top node and not the inner node tree that upload needs.
DedupIdentifier dedupId = kvp.Key.ToBlobIdentifier().ToDedupIdentifier();
if (dedupId.AlgorithmId == ChunkDedupIdentifier.ChunkAlgorithmId)
{
files.Add(new DedupNode(new ChunkInfo(0, (uint)dedupToSize[dedupId], dedupId.AlgorithmResult)));
}
else
{
DedupNode node = await ChunkFileAsync(kvp.Value, cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this re-hash the file? That seems a bit expensive :(

files.Add(node);
}
}

// create the root node and upload
var pageRootNode = new DedupNode(files);
await uploadSession.UploadAsync(pageRootNode, paths, cancellationToken);
// extras.Add(ConvertAbsolutePathToUriPath(output.Key), new FileInfo(tempFilesPerHash[output.Value]));
}

// 3. map all the relative paths to the temp files
foreach (KeyValuePair<string, ContentHash> output in outputs)
while (pageRoots.Count > 1)
Copy link
Member

@dfederm dfederm Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> 0? If not, might need to add a comment explaining why

{
extras.Add(ConvertAbsolutePathToUriPath(output.Key), new FileInfo(tempFilesPerHash[output.Value]));
var newPageRoots = new List<DedupNode>();
foreach (List<DedupNode> page in pageRoots.GetPages(DedupNode.MaxDirectChildrenPerNode))
{
var pageRootNode = new DedupNode(page);
newPageRoots.Add(pageRootNode);
}
pageRoots = newPageRoots;
}

DedupNode root = pageRoots.Single();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[0]


HashSet<DedupNode> proofNodes = ProofHelper.CreateProofNodes(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what "proof" is?

uploadSession.AllNodes,
uploadSession.ParentLookup,
dedupToSize.Keys);

string[] proofNodesSerialized = proofNodes.Select(n => Convert.ToBase64String(n.Serialize())).ToArray();

publishResult = new PublishResult(manifestId, root.GetDedupIdentifier(), proofNodesSerialized, manifest.Items.Count, (long)root.TransitiveContentBytes);
}
else
{
infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray();
FileInfo[] infos = outputs.Keys.Select(f => new FileInfo(f)).ToArray();
publishResult = await WithHttpRetries(
() => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do similar things need to be done here?

context: $"Publishing content for {fingerprint}",
cancellationToken);
}

var result = await WithHttpRetries(
() => _manifestClient.PublishAsync(RepoRoot, infos, extras, new ArtifactPublishOptions(), manifestFileOutputPath: null, cancellationToken),
context: $"Publishing content for {fingerprint}",
cancellationToken);

// double check
{
using var manifestStream = new MemoryStream(await GetBytes(context, result.ManifestId, cancellationToken));
using var manifestStream = new MemoryStream(await GetBytes(context, publishResult.ManifestId, cancellationToken));
Manifest manifest = JsonSerializer.Deserialize<Manifest>(manifestStream)!;
var manifestFiles = CreateNormalizedManifest(manifest);
var outputFiles = CreateNormalizedManifest(outputs);
ThrowIfDifferent(manifestFiles, outputFiles, $"With {nameof(EnableAsyncPublishing)}:{EnableAsyncPublishing}, Manifest `{result.ManifestId}` and Outputs don't match:");
ThrowIfDifferent(manifestFiles, outputFiles, $"With {nameof(EnableAsyncPublishing)}:{EnableAsyncPublishing}, Manifest `{publishResult.ManifestId}` and Outputs don't match:");
}

var key = ComputeKey(fingerprint, forWrite: true);
var entry = new CreatePipelineCacheArtifactContract(
new VisualStudio.Services.PipelineCache.WebApi.Fingerprint(key.Split(KeySegmentSeperator)),
result.ManifestId,
result.RootId,
result.ProofNodes,
publishResult.ManifestId,
publishResult.RootId,
publishResult.ProofNodes,
ContentFormatConstants.Files);

CreateResult createResult = await WithHttpRetries(
Expand Down Expand Up @@ -382,6 +498,9 @@ protected override async Task AddNodeAsync(
}
}

private static Task<DedupNode> ChunkFileAsync(string path, CancellationToken cancellationToken) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used exactly once and is 1 line. Consider inlining it.

ChunkerHelper.CreateFromFileAsync(FileSystem.Instance, path, cancellationToken, configureAwait: false);

private static byte GetAlgorithmId(ContentHash hash)
{
switch (hash._hashType)
Expand Down