From c609ce0f2ea0124da42f02c5d3d21145fcc57b59 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 14 Feb 2025 01:24:41 -0600 Subject: [PATCH] added `Context.Watch` support to `FutureActorRef` close #7501 --- src/core/Akka.Tests/Actor/Bugfix7501Specs.cs | 15 +++++++ src/core/Akka/Actor/ActorRef.cs | 47 ++++++++++++++++++-- 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/src/core/Akka.Tests/Actor/Bugfix7501Specs.cs b/src/core/Akka.Tests/Actor/Bugfix7501Specs.cs index 408b49b2570..7edff0406d6 100644 --- a/src/core/Akka.Tests/Actor/Bugfix7501Specs.cs +++ b/src/core/Akka.Tests/Actor/Bugfix7501Specs.cs @@ -26,24 +26,39 @@ public Bugfix7501Specs(ITestOutputHelper output) : base(output) public async Task FutureActorRefShouldSupportDeathWatch() { // arrange + var customDeathWatchProbe = CreateTestProbe(); var watcher = Sys.ActorOf(act => { act.Receive((_, context) => { // complete the Ask context.Sender.Tell("hi"); + + // DeathWatch the FutureActorRef BEFORE it completes + context.Watch(context.Sender); // deliver the IActorRef of the Ask-er to TestActor TestActor.Tell(context.Sender); }); + + act.Receive((terminated, context) => + { + // shut ourselves down to signal that we got our Terminated from FutureActorRef + context.Stop(context.Self); + }); }); // act + await customDeathWatchProbe.WatchAsync(watcher); await watcher.Ask("boo", RemainingOrDefault); var futureActorRef = await ExpectMsgAsync(); await WatchAsync(futureActorRef); // Ask is finished - should immediately dead-letter // assert await ExpectTerminatedAsync(futureActorRef); + + // get the DeathWatch notification from the original actor + // this can only be received if the original actor got a Terminated message from FutureActorRef + await customDeathWatchProbe.ExpectTerminatedAsync(watcher); } } \ No newline at end of file diff --git a/src/core/Akka/Actor/ActorRef.cs b/src/core/Akka/Actor/ActorRef.cs index 64455c4dcfc..f024a56ed38 100644 --- a/src/core/Akka/Actor/ActorRef.cs +++ b/src/core/Akka/Actor/ActorRef.cs @@ -65,6 +65,23 @@ public interface IRepointableRef : IActorRefScope bool IsStarted { get; } } + /// + /// INTERNAL API - didn't want static helper methods declared inside generic class + /// + internal static class FutureActorRefDeathWatchSupport + { + internal static async Task ScheduleDeathWatch(IInternalActorRef notifier, IActorRef self, Task completionTask) + { + await completionTask; + notifier.SendSystemMessage(TerminatedFor(self)); + } + + internal static DeathWatchNotification TerminatedFor(IActorRef self) + { + return new DeathWatchNotification(self, true, false); + } + } + /// /// INTERNAL API. /// @@ -110,9 +127,6 @@ protected override void TellInternal(object message, IActorRef sender) switch (message) { - case ISystemMessage msg: - handled = _result.TrySetException(new InvalidOperationException($"system message of type '{msg.GetType().Name}' is invalid for {nameof(FutureActorRef)}")); - break; case T t: handled = _result.TrySetResult(t); break; @@ -140,7 +154,32 @@ protected override void TellInternal(object message, IActorRef sender) if (!handled && !_result.Task.IsCanceled) _provider.DeadLetters.Tell(message ?? default(T), this); } - + + public override void SendSystemMessage(ISystemMessage message) + { + if (message is Watch watch) + { + if (_result.Task.IsCompleted) + { + watch.Watcher.SendSystemMessage(FutureActorRefDeathWatchSupport.TerminatedFor(this)); + } + else + { + _ = FutureActorRefDeathWatchSupport.ScheduleDeathWatch(watch.Watcher, watch.Watchee, _result.Task); + } + + } + else if (message is Unwatch unwatch) + { + + } + else + { + // TODO: blow up the caller here by just throwing the exception at the callsite? + _result.TrySetException(new InvalidOperationException($"system message of type '{message.GetType().Name}' is invalid for {nameof(FutureActorRef)}")); + } + } + public virtual void DeliverAsk(object message, ICanTell destination){ destination.Tell(message, this); }