Skip to content

Commit

Permalink
added Context.Watch support to FutureActorRef<T>
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Feb 14, 2025
1 parent a613bcd commit c609ce0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 4 deletions.
15 changes: 15 additions & 0 deletions src/core/Akka.Tests/Actor/Bugfix7501Specs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,39 @@ public Bugfix7501Specs(ITestOutputHelper output) : base(output)
public async Task FutureActorRefShouldSupportDeathWatch()
{
// arrange
var customDeathWatchProbe = CreateTestProbe();
var watcher = Sys.ActorOf(act =>
{
act.Receive<string>((_, context) =>
{
// complete the Ask
context.Sender.Tell("hi");

// DeathWatch the FutureActorRef<T> BEFORE it completes
context.Watch(context.Sender);

// deliver the IActorRef of the Ask-er to TestActor
TestActor.Tell(context.Sender);
});

act.Receive<Terminated>((terminated, context) =>
{
// shut ourselves down to signal that we got our Terminated from FutureActorRef
context.Stop(context.Self);
});
});

// act
await customDeathWatchProbe.WatchAsync(watcher);
await watcher.Ask<string>("boo", RemainingOrDefault);
var futureActorRef = await ExpectMsgAsync<IActorRef>();
await WatchAsync(futureActorRef); // Ask is finished - should immediately dead-letter

// assert
await ExpectTerminatedAsync(futureActorRef);

// get the DeathWatch notification from the original actor
// this can only be received if the original actor got a Terminated message from FutureActorRef
await customDeathWatchProbe.ExpectTerminatedAsync(watcher);
}
}
47 changes: 43 additions & 4 deletions src/core/Akka/Actor/ActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ public interface IRepointableRef : IActorRefScope
bool IsStarted { get; }
}

/// <summary>
/// INTERNAL API - didn't want static helper methods declared inside generic class
/// </summary>
internal static class FutureActorRefDeathWatchSupport
{
internal static async Task ScheduleDeathWatch(IInternalActorRef notifier, IActorRef self, Task completionTask)
{
await completionTask;
notifier.SendSystemMessage(TerminatedFor(self));
}

internal static DeathWatchNotification TerminatedFor(IActorRef self)
{
return new DeathWatchNotification(self, true, false);
}
}

/// <summary>
/// INTERNAL API.
///
Expand Down Expand Up @@ -110,9 +127,6 @@ protected override void TellInternal(object message, IActorRef sender)

switch (message)
{
case ISystemMessage msg:
handled = _result.TrySetException(new InvalidOperationException($"system message of type '{msg.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}"));
break;
case T t:
handled = _result.TrySetResult(t);
break;
Expand Down Expand Up @@ -140,7 +154,32 @@ protected override void TellInternal(object message, IActorRef sender)
if (!handled && !_result.Task.IsCanceled)
_provider.DeadLetters.Tell(message ?? default(T), this);
}


public override void SendSystemMessage(ISystemMessage message)
{
if (message is Watch watch)
{
if (_result.Task.IsCompleted)
{
watch.Watcher.SendSystemMessage(FutureActorRefDeathWatchSupport.TerminatedFor(this));
}
else
{
_ = FutureActorRefDeathWatchSupport.ScheduleDeathWatch(watch.Watcher, watch.Watchee, _result.Task);
}

}
else if (message is Unwatch unwatch)
{

}
else
{
// TODO: blow up the caller here by just throwing the exception at the callsite?
_result.TrySetException(new InvalidOperationException($"system message of type '{message.GetType().Name}' is invalid for {nameof(FutureActorRef<T>)}"));
}
}

public virtual void DeliverAsk(object message, ICanTell destination){
destination.Tell(message, this);
}
Expand Down

0 comments on commit c609ce0

Please # to comment.