Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 43e90ef

Browse files
committed
Added zipWithTimeSinceSubscription, #9
1 parent 21eddc5 commit 43e90ef

File tree

2 files changed

+16
-2
lines changed

2 files changed

+16
-2
lines changed

src/main/scala/reactor/core/scala/publisher/Flux.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package reactor.core.scala.publisher
22

33
import java.lang.{Boolean => JBoolean, Iterable => JIterable, Long => JLong}
44
import java.util
5-
import java.util.concurrent.Callable
5+
import java.util.concurrent.{Callable, TimeUnit}
66
import java.util.function.{BiFunction, Consumer, Function, Supplier}
77
import java.util.logging.Level
88
import java.util.{Comparator, stream, List => JList}
@@ -11,7 +11,7 @@ import org.reactivestreams.{Publisher, Subscriber, Subscription}
1111
import reactor.core.Disposable
1212
import reactor.core.publisher.FluxSink.OverflowStrategy
1313
import reactor.core.publisher.{BufferOverflowStrategy, FluxSink, Signal, SignalType, SynchronousSink, Flux => JFlux, GroupedFlux => JGroupedFlux}
14-
import reactor.core.scheduler.Scheduler
14+
import reactor.core.scheduler.{Scheduler, Schedulers}
1515
import reactor.util.Logger
1616
import reactor.util.context.Context
1717
import reactor.util.function.Tuple2
@@ -4065,6 +4065,13 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T]) extends
40654065
*/
40664066
final def zipWithIterable[T2, V](iterable: Iterable[_ <: T2], zipper: (T, T2) => _ <: V) = Flux(jFlux.zipWithIterable[T2, V](iterable, zipper))
40674067

4068+
final def zipWithTimeSinceSubscribe(): Flux[(T, Long)] = {
4069+
val scheduler = Schedulers.single()
4070+
var subscriptionTime: Long = 0
4071+
doOnSubscribe(_ => subscriptionTime = scheduler.now(TimeUnit.MILLISECONDS))
4072+
.map(t => (t, scheduler.now(TimeUnit.MILLISECONDS) - subscriptionTime))
4073+
}
4074+
40684075
final def asJava(): JFlux[T] = jFlux
40694076
}
40704077

src/test/scala/reactor/core/scala/publisher/FluxTest.scala

+7
Original file line numberDiff line numberDiff line change
@@ -2120,6 +2120,13 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
21202120
}
21212121
}
21222122

2123+
".zipWithTimeSinceSubscribe should emit tuple2 with the second element as the time taken to emit since subscription in milliseconds" in {
2124+
StepVerifier.withVirtualTime(() => Flux.just(1, 2, 3).delayElements(1 second).zipWithTimeSinceSubscribe())
2125+
.thenAwait(3 seconds)
2126+
.expectNext((1, 1000l), (2, 2000l), (3, 3000l))
2127+
.verifyComplete()
2128+
}
2129+
21232130
".asJava should convert to java" in {
21242131
val flux = Flux.just(1, 2, 3).asJava()
21252132
flux shouldBe a[reactor.core.publisher.Flux[_]]

0 commit comments

Comments
 (0)