-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFutureAsyncDoFn.scala
43 lines (33 loc) · 1.41 KB
/
FutureAsyncDoFn.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package example
import java.util.function
import com.spotify.scio.transforms.BaseAsyncDoFn
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.collection.JavaConverters._
abstract class FutureAsyncDoFn[InputT, OutputT, ResourceT]
extends BaseAsyncDoFn[InputT, OutputT, ResourceT, Future[OutputT]] {
override def waitForFutures(
futures: java.lang.Iterable[Future[OutputT]]): Unit = {
val future: Future[Iterable[OutputT]] = Future.sequence(futures.asScala)
Await.ready(future, 60.seconds)
}
override def addCallback(
future: Future[OutputT],
onSuccess: function.Function[OutputT, Void],
onFailure: function.Function[Throwable, Void]): Future[OutputT] = {
future.onSuccess[Void](successCallback(onSuccess))
future.onFailure[Void](failureCallback(onFailure))
future
}
private def successCallback(callback: function.Function[OutputT, Void]) =
new PartialFunction[OutputT, Void] {
override def isDefinedAt(x: OutputT): Boolean = true
override def apply(value: OutputT): Void = callback.apply(value)
}
private def failureCallback(callback: function.Function[Throwable, Void]) =
new PartialFunction[Throwable, Void] {
override def isDefinedAt(x: Throwable): Boolean = true
override def apply(value: Throwable): Void = callback.apply(value)
}
}