Skip to content

Commit

Permalink
Fix incorrect path variable (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy authored Jul 11, 2024
1 parent edc50b9 commit 05f54c8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 30 deletions.
36 changes: 10 additions & 26 deletions src/Sources/BlobStorage/BlobStorageSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Stage;
using Arcane.Framework.Contracts;
using Arcane.Framework.Sources.Base;
using Snd.Sdk.Storage.Base;

namespace Arcane.Framework.Sources.BlobStorage;
Expand All @@ -17,20 +15,18 @@ namespace Arcane.Framework.Sources.BlobStorage;
/// The source enumerates a cloud blob storage content (e.g. S3 or Azure Blob Container) and emits list of objects
/// that exist in the storage.
/// </summary>
public class BlobStorageSource : GraphStage<SourceShape<string>>, ITaggedSource
public class BlobStorageSource : GraphStage<SourceShape<string>>
{
private readonly string prefix;
private readonly string path;
private readonly IBlobStorageListService blobStorageService;
private readonly TimeSpan changeCaptureInterval;
private readonly string blobContainer;

private BlobStorageSource(string blobContainer, string prefix, IBlobStorageListService blobStorageService,
private BlobStorageSource(string path, IBlobStorageListService blobStorageService,
TimeSpan changeCaptureInterval)
{
this.prefix = prefix;
this.path = path;
this.blobStorageService = blobStorageService;
this.changeCaptureInterval = changeCaptureInterval;
this.blobContainer = blobContainer;
this.Shape = new SourceShape<string>(this.Out);
}

Expand All @@ -51,39 +47,27 @@ protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
return new SourceLogic(this);
}

/// <inheritdoc cref="ITaggedSource.GetDefaultTags"/>
public SourceTags GetDefaultTags()
{
return new SourceTags
{
SourceEntity = this.blobContainer,
SourceLocation = this.prefix
};
}

/// <summary>
/// Creates a <see cref="Source"/> for a cloud blob storage container.
/// </summary>
/// <param name="blobContainer">Container name (Blob storage container, S3 bucket etc...)</param>
/// <param name="prefix">Filter objects by prefix</param>
/// <param name="path">Filter objects by prefix</param>
/// <param name="blobStorageService">Blob storage service instance</param>
/// <param name="changeCaptureInterval">How often check for storage updates</param>
/// <returns>BlobStorageSource instance</returns>
[ExcludeFromCodeCoverage(Justification = "Factory method")]
public static BlobStorageSource Create(
string blobContainer,
string prefix,
string path,
IBlobStorageListService blobStorageService,
TimeSpan changeCaptureInterval)
{
return new BlobStorageSource(blobContainer, prefix, blobStorageService, changeCaptureInterval);
return new BlobStorageSource(path, blobStorageService, changeCaptureInterval);
}

private class SourceLogic : TimerGraphStageLogic
{
private const string TimerKey = nameof(SourceLogic);

private readonly string prefix;
private readonly string path;
private readonly IBlobStorageListService blobStorageService;
private readonly TimeSpan changeCaptureInterval;
private readonly LocalOnlyDecider decider;
Expand All @@ -94,7 +78,7 @@ private class SourceLogic : TimerGraphStageLogic
public SourceLogic(BlobStorageSource source) : base(source.Shape)
{
this.source = source;
this.prefix = source.prefix;
this.path = source.path;
this.blobStorageService = source.blobStorageService;
this.changeCaptureInterval = source.changeCaptureInterval;
this.blobs = Enumerable.Empty<string>();
Expand Down Expand Up @@ -150,7 +134,7 @@ private void OnPull()

private void GetBlobs()
{
this.blobs = this.blobStorageService.ListBlobsAsEnumerable(this.prefix).Select(s => s.Name).ToList();
this.blobs = this.blobStorageService.ListBlobsAsEnumerable(this.path).Select(s => s.Name).ToList();
}
}
}
6 changes: 2 additions & 4 deletions test/Sources/BlobStorageSourceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public async Task TestCanStreamBlobStorageObjectNames()
this.mockBlobStorageService
.Setup(s => s.ListBlobsAsEnumerable(It.IsAny<string>()))
.Returns(new[] { new StoredBlob { Name = "key/value/item.csv", LastModified = DateTimeOffset.UtcNow } });
var blobStorageSource = BlobStorageSource.Create("container",
"",
var blobStorageSource = BlobStorageSource.Create("s3a://bucket/prefix",
this.mockBlobStorageService.Object,
TimeSpan.FromMinutes(1));
var source = Source.FromGraph(blobStorageSource);
Expand Down Expand Up @@ -64,8 +63,7 @@ public async Task TestDoesNotStopOnEmpty()
callCount++;
return Array.Empty<StoredBlob>();
});
var blobStorageSource = BlobStorageSource.Create("container",
"",
var blobStorageSource = BlobStorageSource.Create("s3a://bucket/prefix",
this.mockBlobStorageService.Object,
TimeSpan.FromSeconds(1));
var source = Source.FromGraph(blobStorageSource);
Expand Down

0 comments on commit 05f54c8

Please # to comment.