Skip to content

Commit

Permalink
Avoid "stream not found" log message when it is expected
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed Mar 3, 2025
1 parent e4769af commit 4cfc7b0
Show file tree
Hide file tree
Showing 23 changed files with 150 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ public class TracedEventReader(IEventReader reader) : BaseTracer, IEventReader {

IEventReader Inner { get; } = reader;

public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> Trace(stream, Operations.ReadEvents, () => Inner.ReadEvents(stream, start, count, cancellationToken));
public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> Trace(stream, Operations.ReadEvents, () => Inner.ReadEvents(stream, start, count, failIfNotFound, cancellationToken));

public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> Trace(stream, Operations.ReadEvents, () => Inner.ReadEventsBackwards(stream, start, count, cancellationToken));
public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> Trace(stream, Operations.ReadEvents, () => Inner.ReadEventsBackwards(stream, start, count, failIfNotFound, cancellationToken));

// ReSharper disable once ConvertToAutoProperty
protected override string ComponentName => _componentName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Eventuous.Diagnostics.Tracing;

public class TracedEventStore(IEventStore eventStore) : BaseTracer, IEventStore {
public static IEventStore Trace(IEventStore eventStore) => new TracedEventStore(eventStore);

readonly string _componentName = eventStore.GetType().Name;

internal IEventStore Inner { get; } = eventStore;
Expand All @@ -28,11 +28,11 @@ CancellationToken cancellationToken
)
=> Writer.AppendEvents(stream, expectedVersion, events, cancellationToken);

public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> Reader.ReadEvents(stream, start, count, cancellationToken);
public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> Reader.ReadEvents(stream, start, count, failIfNotFound, cancellationToken);

public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> Reader.ReadEventsBackwards(stream, start, count, cancellationToken);
public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> Reader.ReadEventsBackwards(stream, start, count, failIfNotFound, cancellationToken);

public Task TruncateStream(
StreamName stream,
Expand Down
6 changes: 4 additions & 2 deletions src/Core/src/Eventuous.Persistence/EventStore/IEventReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ public interface IEventReader {
/// <param name="stream">Stream name</param>
/// <param name="start">Where to start reading events</param>
/// <param name="count">How many events to read</param>
/// <param name="failIfNotFound">Throw an exception if the stream is not found</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>An array with events retrieved from the stream</returns>
Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken);
Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken);

/// <summary>
/// Read a number of events from a given stream, backwards (from the stream end)
/// </summary>
/// <param name="stream">Stream name</param>
/// <param name="start">Where to start reading events</param>
/// <param name="count">How many events to read</param>
/// <param name="failIfNotFound">Throw an exception if the stream is not found</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>An array with events retrieved from the stream</returns>
Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start,int count, CancellationToken cancellationToken);
Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static async Task<StreamEvent[]> ReadStream(

try {
while (true) {
var events = await eventReader.ReadEvents(streamName, position, pageSize, cancellationToken).NoContext();
var events = await eventReader.ReadEvents(streamName, position, pageSize, failIfNotFound, cancellationToken).NoContext();
streamEvents.AddRange(events);

if (events.Length < pageSize) break;
Expand Down
20 changes: 10 additions & 10 deletions src/Core/src/Eventuous.Persistence/EventStore/TieredEventReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,36 @@ namespace Eventuous;
/// <param name="hotReader">Event reader pointing to hot store</param>
/// <param name="archiveReader">Event reader pointing to archive store</param>
public class TieredEventReader(IEventReader hotReader, IEventReader archiveReader) : IEventReader {
public async Task<StreamEvent[]> ReadEvents(StreamName streamName, StreamReadPosition start, int count, CancellationToken cancellationToken) {
var hotEvents = await LoadStreamEvents(hotReader, start, count).NoContext();
public async Task<StreamEvent[]> ReadEvents(StreamName streamName, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) {
var hotEvents = await LoadStreamEvents(hotReader, start, count, true).NoContext();

var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Position > start.Value
? await LoadStreamEvents(archiveReader, start, (int)hotEvents[0].Position).NoContext()
? await LoadStreamEvents(archiveReader, start, (int)hotEvents[0].Position, !failIfNotFound).NoContext()
: Enumerable.Empty<StreamEvent>();

return archivedEvents.Select(x => x with { FromArchive = true }).Concat(hotEvents).Distinct(Comparer).ToArray();

async Task<StreamEvent[]> LoadStreamEvents(IEventReader reader, StreamReadPosition startPosition, int localCount) {
async Task<StreamEvent[]> LoadStreamEvents(IEventReader reader, StreamReadPosition startPosition, int localCount, bool ignore) {
try {
return await reader.ReadEvents(streamName, startPosition, localCount, cancellationToken).NoContext();
return await reader.ReadEvents(streamName, startPosition, localCount, !ignore, cancellationToken).NoContext();
} catch (StreamNotFound) {
return [];
}
}
}

public async Task<StreamEvent[]> ReadEventsBackwards(StreamName streamName, StreamReadPosition start, int count, CancellationToken cancellationToken) {
var hotEvents = await LoadStreamEvents(hotReader, start, count).NoContext();
public async Task<StreamEvent[]> ReadEventsBackwards(StreamName streamName, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken) {
var hotEvents = await LoadStreamEvents(hotReader, start, count, true).NoContext();

var archivedEvents = hotEvents.Length == 0 || hotEvents[0].Position > start.Value - count
? await LoadStreamEvents(archiveReader, new(hotEvents[0].Position - 1), count - hotEvents.Length).NoContext()
? await LoadStreamEvents(archiveReader, new(hotEvents[0].Position - 1), count - hotEvents.Length, failIfNotFound).NoContext()
: Enumerable.Empty<StreamEvent>();

return hotEvents.Concat(archivedEvents.Select(x => x with { FromArchive = true })).Distinct(Comparer).ToArray();

async Task<StreamEvent[]> LoadStreamEvents(IEventReader reader, StreamReadPosition startPosition, int localCount) {
async Task<StreamEvent[]> LoadStreamEvents(IEventReader reader, StreamReadPosition startPosition, int localCount, bool ignore) {
try {
return await reader.ReadEventsBackwards(streamName, startPosition, localCount, cancellationToken).NoContext();
return await reader.ReadEventsBackwards(streamName, startPosition, localCount, !ignore, cancellationToken).NoContext();
} catch (StreamNotFound) {
return [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ namespace Eventuous;
public class TieredEventStore(IEventStore hotStore, IEventReader archiveReader) : IEventStore {
readonly TieredEventReader _tieredReader = new(Ensure.NotNull(hotStore), Ensure.NotNull(archiveReader));

public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> _tieredReader.ReadEvents(stream, start, count, cancellationToken);
public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> _tieredReader.ReadEvents(stream, start, count, failIfNotFound, cancellationToken);

public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> _tieredReader.ReadEventsBackwards(stream, start, count, cancellationToken);
public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> _tieredReader.ReadEventsBackwards(stream, start, count, failIfNotFound, cancellationToken);

public Task<AppendEventsResult> AppendEvents(
StreamName stream,
Expand Down
5 changes: 4 additions & 1 deletion src/Core/src/Eventuous.Persistence/ExpectedStreamVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ namespace Eventuous;
public readonly record struct ExpectedStreamVersion(long Value) {
public static readonly ExpectedStreamVersion NoStream = new(-1);
public static readonly ExpectedStreamVersion Any = new(-2);

public bool ExistingStream => Value >= 0;
}

public record struct StreamReadPosition {
public StreamReadPosition(long Value) {
if (Value < 0) throw new ArgumentOutOfRangeException(nameof(Value), "StreamReadPosition cannot be negative.");

this.Value = Value;
}

Expand All @@ -22,4 +25,4 @@ public StreamReadPosition(long Value) {
public readonly void Deconstruct(out long value) => value = this.Value;
}

public record struct StreamTruncatePosition(long Value);
public record struct StreamTruncatePosition(long Value);
5 changes: 2 additions & 3 deletions src/Core/src/Eventuous.Persistence/StateStore/StateStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ public class StateStore(IEventReader eventReader, IEventSerializer? serializer =
const int PageSize = 500;

[Obsolete("Use IEventReader.LoadState<T> instead")]
public async Task<T> LoadState<T>(StreamName stream, CancellationToken cancellationToken)
where T : State<T>, new() {
public async Task<T> LoadState<T>(StreamName stream, CancellationToken cancellationToken) where T : State<T>, new() {
var state = new T();

const int pageSize = 500;

var position = StreamReadPosition.Start;

while (true) {
var events = await _eventReader.ReadEvents(stream, position, pageSize, cancellationToken).NoContext();
var events = await _eventReader.ReadEvents(stream, position, pageSize, true, cancellationToken).NoContext();

foreach (var streamEvent in events) {
Fold(streamEvent);
Expand Down
10 changes: 5 additions & 5 deletions src/Core/test/Eventuous.Tests.Persistence.Base/Store/Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task ShouldReadOne(CancellationToken cancellationToken) {
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvent(streamName, evt, ExpectedStreamVersion.NoStream);

var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 100, cancellationToken);
var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 100, true, cancellationToken);
await Assert.That(result.Length).IsEqualTo(1);
await Assert.That(result[0].Payload).IsEquivalentTo(evt);
}
Expand All @@ -33,7 +33,7 @@ public async Task ShouldReadMany(CancellationToken cancellationToken) {
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 100, cancellationToken);
var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 100, true, cancellationToken);
var actual = result.Select(x => x.Payload);
await Assert.That(actual).IsEquivalentTo(events);
}
Expand All @@ -45,7 +45,7 @@ public async Task ShouldReadTail(CancellationToken cancellationToken) {
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

var result = await _fixture.EventStore.ReadEvents(streamName, new(10), 100, cancellationToken);
var result = await _fixture.EventStore.ReadEvents(streamName, new(10), 100, true, cancellationToken);
var expected = events.Skip(10);
var actual = result.Select(x => x.Payload!);
await Assert.That(actual).IsEquivalentTo(expected);
Expand All @@ -58,7 +58,7 @@ public async Task ShouldReadHead(CancellationToken cancellationToken) {
var streamName = Helpers.GetStreamName();
await _fixture.AppendEvents(streamName, events, ExpectedStreamVersion.NoStream);

var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 10, cancellationToken);
var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 10, true, cancellationToken);
var expected = events.Take(10);

IEnumerable<object> actual = result.Select(x => x.Payload)!;
Expand All @@ -73,7 +73,7 @@ public async Task ShouldReadMetadata(CancellationToken cancellationToken) {

await _fixture.AppendEvent(streamName, evt, ExpectedStreamVersion.NoStream, new() { { "Key1", "Value1" }, { "Key2", "Value2" } });

var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 100, cancellationToken);
var result = await _fixture.EventStore.ReadEvents(streamName, StreamReadPosition.Start, 100, true, cancellationToken);

await Assert.That(result.Length).IsEqualTo(1);
await Assert.That(result[0].Payload).IsEquivalentTo(evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ protected TieredStoreTestsBase(StoreFixtureBase<TContainer> storeFixture) {
}

class ArchiveStore(IEventStore original) : IEventReader, IEventWriter {
public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> original.ReadEvents(GetArchiveStreamName(stream), start, count, cancellationToken);
public Task<StreamEvent[]> ReadEvents(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> original.ReadEvents(GetArchiveStreamName(stream), start, count, failIfNotFound, cancellationToken);

public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, CancellationToken cancellationToken)
=> original.ReadEventsBackwards(GetArchiveStreamName(stream), start, count, cancellationToken);
public Task<StreamEvent[]> ReadEventsBackwards(StreamName stream, StreamReadPosition start, int count, bool failIfNotFound, CancellationToken cancellationToken)
=> original.ReadEventsBackwards(GetArchiveStreamName(stream), start, count, failIfNotFound, cancellationToken);

static StreamName GetArchiveStreamName(string streamName) => new($"Archive-{streamName}");

Expand Down
2 changes: 1 addition & 1 deletion src/Core/test/Eventuous.Tests/StoringEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public async Task StoreInitial(CancellationToken cancellationToken) {
result.TryGet(out var ok).Should().BeTrue();
ok!.Changes.Should().BeEquivalentTo(expected);

var evt = await EventStore.ReadEvents(StreamName.For<Booking>(cmd.BookingId), StreamReadPosition.Start, 1, CancellationToken.None);
var evt = await EventStore.ReadEvents(StreamName.For<Booking>(cmd.BookingId), StreamReadPosition.Start, 1, true, CancellationToken.None);

evt[0].Payload.Should().BeEquivalentTo(ok.Changes.First().Event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async Task TestOnNew(CancellationToken cancellationToken) {
result.TryGet(out var ok).Should().BeTrue();
ok!.Changes.Should().BeEquivalentTo(expected);

var evt = await EventStore.ReadEvents(GetStreamName(new(cmd.BookingId)), StreamReadPosition.Start, 1, CancellationToken.None);
var evt = await EventStore.ReadEvents(GetStreamName(new(cmd.BookingId)), StreamReadPosition.Start, 1, true, CancellationToken.None);

evt[0].Payload.Should().BeEquivalentTo(ok.Changes.First().Event);
}
Expand All @@ -51,7 +51,7 @@ public async Task TestOnExisting(CancellationToken cancellationToken) {
result.TryGet(out var ok).Should().BeTrue();
ok!.Changes.Should().BeEquivalentTo(expected);

var evt = await EventStore.ReadEvents(GetStreamName(new(cmd.BookingId)), StreamReadPosition.Start, 100, CancellationToken.None);
var evt = await EventStore.ReadEvents(GetStreamName(new(cmd.BookingId)), StreamReadPosition.Start, 100, true, CancellationToken.None);

var actual = evt.Skip(1).Select(x => x.Payload);

Expand Down
Loading

0 comments on commit 4cfc7b0

Please # to comment.