Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Broadcast & Typechecking #28

Closed
wants to merge 2 commits into from

Conversation

PetrosPapapa
Copy link
Contributor

@PetrosPapapa PetrosPapapa commented Oct 17, 2024

This PR introduces convenience classes for broadcasting to actors and type checking the responses.

Typechecking

Although having a typed response provides a certain level of robustness in the communication, it is often the case when using "ask" that we expect a specific (sub-)type of a response.

The Typechecking and TypecheckSyntax implicits offer convenience methods to do this dynamic type checking using ClassTag.

More specifically, TypecheckFOps provides methods to typecheck any effectful result F[R]. If the typecheck succeeds, we return the result safely casted to the checked type. Each of the available methods offers a different way of reporting a failed typecheck, using an Option (typecheckOpt) an alternative result in an EitherT (typecheckOr and typecheckOrF) or raise an error (typecheck).

Finally, TypecheckActorOps offers an "ask" with a response of an expected type (?>). We also provide a version of this ask where the response is first pre-processed by a given function before the type check (?>>). We have found this to be useful if you want to pre-process responses with specific semantics, e.g. responses corresponding to errors which wouldn't typecheck.

Example:

object TypecheckedAsk extends IOApp {

  sealed trait Want
  case object Beer extends Want
  case object Water extends Want

  sealed trait Response
  case class Yes(count: Int) extends Response
  case object No extends Response

  override def run(args: List[String]): IO[ExitCode] =
    ActorSystem[IO]("typechecked ask")
      .use { system =>
        for {
          actor <- system.replyingActorOf(new ReplyingActor[IO, Want, Response] {
            override def receive: ReplyingReceive[IO, Want, Response] = {
              case Beer => Yes(1).pure[IO]
              case Water => No.pure[IO]
            }
          })

          plainResponse <- actor ? Beer
          typedResponse <- actor ?>[Yes] Beer
        } yield ExitCode.Success
      }
}

Note here that plainResponse is of type Response whereas typedResponse is of type Yes and needs no further checking. If the actor were to reply No (e.g. if we used actor ?>[Yes] Water), we would get a TypecheckException at that point.

Broadcast

Broadcasting is essentially a traverse or parTraverse over a list of actors, sending the same message to all of them. The newly introduced class and syntax offers some convenience in the following ways:

  • You can use ! or ? directly to a traversable collection of actors.
  • You can map/flatMap/mapFilter each response.
  • You can typecheck each response.
  • Choose if you want to broadcast sequentially or in parallel.

What is important here is to note that all the mapping and filtering methods are applied on each response individually as soon as we get them (in parallel when using .parallel). One could achieve the same effect by traversing the resulting list of responses. That would mean a second traversal of that list with an associated O(n) cost (even if that is often small).

Example usage is taken from com.suprnation.actor.broadcast.TreenodeActor in the unit tests:

(children ? Ping)
  .mapping{ case Fail(error) => IO.raiseError(error) }
  .expecting[Pong]
  .parallel
  .map(_.combineAll.append(name))

Explanation:

  • The value children carries a list of actor refs. We can use ? to broadcast Ping to all of them.
  • The .mapping method allows us to apply a partial mapping function and handle some of the responses. In this case, we use Fail as an error response and raise a specific exception if we receive it.
  • The .expecting method specifies the type of response we are expecting (otherwise we get a TypecheckException).
  • The .parallel method executes the broadcast in parallel and yields the collection of responses.
  • The final .map uses .combineAll to compose all of the responses together.

One benefit of using these methods and particularly expecting here is that it could be the case that the Response type is not a monoid so we wouldn't be able to use combineAll directly. Here the mapping and type checking of the responses results in a collection of Pong which is a monoid, so we can use combineAll immediately. All of this is accomplished in a single, declarative chain of methods.

I tried many different approaches to setting this up and all of them felt somewhat opinionated and with different pros and cons. I ended up with this one, which is not perfect, but quite convenient at least for the purposes described here.

All feedback welcome!

@PetrosPapapa PetrosPapapa force-pushed the feature/Broadcast branch 2 times, most recently from b9942dc to 3c5ea47 Compare October 17, 2024 21:48
PetrosPapapa and others added 2 commits October 30, 2024 08:10
* fix: Propagate error to sender when asking

fixes suprnation#25

* refactor: Cleanup AskSpec and scalafmt

* refactor: ActorCell$.processMailbox use fold instead of match
@cloudmark
Copy link
Collaborator

We created a single commit to go for RC6 release.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants