Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

AkkaStream & PekkoStream usage can lead to threads blocked/waiting #627

Open
gaeljw opened this issue Nov 18, 2024 · 4 comments
Open

AkkaStream & PekkoStream usage can lead to threads blocked/waiting #627

gaeljw opened this issue Nov 18, 2024 · 4 comments

Comments

@gaeljw
Copy link
Member

gaeljw commented Nov 18, 2024

Context

We use Anorm with PekkoStream (previously AkkaStream) in a Play Framework application. Our usage is pretty standard with the exception that the requests we do with Anorm are to a database that can take several seconds (or even more sometimes) to start answering results.

In this situation, we noticed that all "default" threads can become blocked and thus the Play application becomes irresponsive to any request (including a healthcheck request that is absolutely unrelated and just answers OK without doing anything else).

Details

Our usage

Just to highlight that we're not doing anything custom, minimized code:

val source: Source[OurDataModel, NotUsed] = PekkoStream.source(SQL("SELECT * FROM ..."), rowParser)
val jsonSource: Source[ByteString, NotUsed] = source.map(row => transformRowToJson(row))
Ok.chunked(jsonSource) // Play Results

Observations

A thread dump during a period where the application is irresponsive shows threads being waiting/hanging in the preStart of the stream:

....
at app//anorm.Cursor$.apply(Cursor.scala:31)
at app//anorm.Sql$.unsafeCursor(Anorm.scala:248)
at app//anorm.PekkoStream$ResultSource$$anon$1.nextCursor(PekkoStream.scala:149)
at app//anorm.PekkoStream$ResultSource$$anon$1.preStart(PekkoStream.scala:126)
at app//org.apache.pekko.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:317)
at app//org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:631)
...
at app//org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:729)
at app//org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at app//org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at app//org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
at app//org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
at app//org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
at java.base@21.0.5/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
at java.base@21.0.5/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)

Relevant code in Anorm:

override def preStart(): Unit = {
try {
resultSet = sql.unsafeResultSet(connection)
nextCursor()
} catch {
case NonFatal(cause) => failWith(cause)
}
}

Workaround

We've been able to fix the issue on our side by explicitly wrapping the preStart code in a scala.concurrent.blocking block.

override def preStart(): Unit = {
+  blocking {
    try {
      resultSet = sql.unsafeResultSet(connection)
      nextCursor()
    } catch {
      case NonFatal(cause) => failWith(cause)
    }
+  }
}

(This requires that we "fork" this class in our own code.)

This has for consequences that the thread pool of Pekko actors can grow to still be able to accept non-blocking code. The threads hanging for the database are not blocking anymore other requests.

Now the question is: should it be the default behavior? As Anorm and JDBC drivers are blocking by default, shouldn't the preStart be robust so that if it blocks, it doesn't affect the Pekko actors threads? Would there be any downside in wrapping this code in blocking even if it isn't actually blocking?

If it makes sense to you, I'll be happy to open the PR :)

@gaeljw
Copy link
Member Author

gaeljw commented Nov 18, 2024

Obviously, if you see any better way of fixing this, please say it :)

I'm not super fan of blocking { ... } as it relies on the underlying thread pool supporting this capability. But on the other hand we are in Pekko layers and I don't see how I could provide my own ExecutionContext (and only for the preStart phase).

@cchantep
Copy link
Member

Either Akka or Pekko, if you needs execution of blocking function to be isolated, needs to use a separe dispatcher (not the main Play one).

@gaeljw
Copy link
Member Author

gaeljw commented Nov 19, 2024

Either Akka or Pekko, if you needs execution of blocking function to be isolated, needs to use a separe dispatcher (not the main Play one).

Sure but how could I do it in this scenario then?

I may be missing something obvious but the blocking code is out of my control. If nextCursor() blocks inside preStart, what can I do? It goes to HikariCP optionally and then the JDBC driver. I have no chance to wrap this in a custom ExecutionContext/dispatcher.

As a client of the library (Anorm), I'm only giving it a SQL query to execute in streaming over a Connection. Everything else is out of my control. Isn't it?

@cchantep
Copy link
Member

As a client of the Akka or Pekko, you provide a Materializer, based on a dispatcher, which must but isolated.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants