diff --git a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs index c45fc9cdae..9e0fc14fa3 100644 --- a/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs +++ b/tests/Aspire.Hosting.Tests/Eventing/DistributedApplicationBuilderEventingTests.cs @@ -11,6 +11,157 @@ namespace Aspire.Hosting.Tests.Eventing; public class DistributedApplicationBuilderEventingTests { + [Fact] + public async Task EventsCanBePublishedBlockSequential() + { + using var builder = TestDistributedApplicationBuilder.Create(); + + var hitCount = 0; + var blockAssertionTcs = new TaskCompletionSource(); + var blockFirstSubscriptionTcs = new TaskCompletionSource(); + + builder.Eventing.Subscribe(async (@event, ct) => + { + blockAssertionTcs.SetResult(); + hitCount++; + await blockFirstSubscriptionTcs.Task; + }); + + builder.Eventing.Subscribe((@event, ct) => + { + hitCount++; + return Task.CompletedTask; + }); + + var pendingPublish = builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingSequential); + + await blockAssertionTcs.Task; + Assert.Equal(1, hitCount); + blockFirstSubscriptionTcs.SetResult(); + await pendingPublish; + Assert.Equal(2, hitCount); + } + + [Fact] + public async Task EventsCanBePublishedBlockConcurrent() + { + using var builder = TestDistributedApplicationBuilder.Create(); + + var hitCount = 0; + var blockAssertionSub1 = new TaskCompletionSource(); + var blockAssertionSub2 = new TaskCompletionSource(); + var blockSubscriptionCompletion = new TaskCompletionSource(); + + builder.Eventing.Subscribe(async (@event, ct) => + { + hitCount++; + blockAssertionSub1.SetResult(); + await blockSubscriptionCompletion.Task; + }); + + builder.Eventing.Subscribe(async (@event, ct) => + { + hitCount++; + blockAssertionSub2.SetResult(); + await blockSubscriptionCompletion.Task; + }); + + var pendingPublish = builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.BlockingConcurrent); + + await Task.WhenAll(blockAssertionSub1.Task, blockAssertionSub2.Task); + Assert.Equal(2, hitCount); + blockSubscriptionCompletion.SetResult(); + await pendingPublish; + } + + [Fact] + public async Task EventsCanBePublishedNonBlockingConcurrent() + { + using var builder = TestDistributedApplicationBuilder.Create(); + + var hitCount = 0; + var blockAssertionSub1 = new TaskCompletionSource(); + var blockAssertionSub2 = new TaskCompletionSource(); + var blockSubscriptionExecution = new TaskCompletionSource(); + + builder.Eventing.Subscribe(async (@event, ct) => + { + await blockSubscriptionExecution.Task; + hitCount++; + blockAssertionSub1.SetResult(); + }); + + builder.Eventing.Subscribe(async (@event, ct) => + { + await blockSubscriptionExecution.Task; + hitCount++; + blockAssertionSub2.SetResult(); + }); + + var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Should be way more than we need! + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingConcurrent, timeoutCts.Token); + + blockSubscriptionExecution.SetResult(); + await Task.WhenAll(blockAssertionSub1.Task, blockAssertionSub2.Task); + Assert.Equal(2, hitCount); + } + + [Fact] + public async Task EventsCanBePublishedNonBlockingSequential() + { + using var builder = TestDistributedApplicationBuilder.Create(); + + var hitCount = 0; + var blockEventSub1 = new TaskCompletionSource(); + var blockEventSub2 = new TaskCompletionSource(); + var blockAssert1 = new TaskCompletionSource(); + var blockAssert2 = new TaskCompletionSource(); + var blockAssert3 = new TaskCompletionSource(); + + builder.Eventing.Subscribe(async (@event, ct) => + { + blockAssert1.SetResult(); + await blockEventSub1.Task; + hitCount++; + blockAssert2.SetResult(); + await blockEventSub2.Task; + }); + + builder.Eventing.Subscribe((@event, ct) => + { + hitCount++; + blockAssert3.SetResult(); + return Task.CompletedTask; + }); + + var timeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // Should be way more than we need! + await builder.Eventing.PublishAsync(new DummyEvent(), EventDispatchBehavior.NonBlockingSequential, timeoutCts.Token); + + // Make sure that we are zero when we enter + // the first handler. + await blockAssert1.Task; + Assert.Equal(0, hitCount); + + // Give the second handler a chance to run, + // it shouldn't and hit count should + // still be zero. + await Task.Delay(1000); + Assert.Equal(0, hitCount); + + // After we unblock the first sub + // we update the hit count and verify + // that it has moved to 1. + blockEventSub1.SetResult(); + await blockAssert2.Task; + Assert.Equal(1, hitCount); + blockEventSub2.SetResult(); + + // Now block until the second handler has + // run and make sure it has incremented. + await blockAssert3.Task; + Assert.Equal(2, hitCount); + } + [Fact] public void CanResolveIDistributedApplicationEventingFromDI() { @@ -115,4 +266,8 @@ public async Task LifeycleHookAnalogousEventsFire() Assert.True(allFired); await app.StopAsync(); } + + public class DummyEvent : IDistributedApplicationEvent + { + } }