Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit 3c07450

Browse files
committed
Added SFlux.name
This will also fix bug #18
1 parent 7683920 commit 3c07450

File tree

3 files changed

+29
-6
lines changed

3 files changed

+29
-6
lines changed

src/main/scala/reactor/core/scala/Scannable.scala

+10-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,16 @@ trait Scannable {
9191
}
9292

9393
object Scannable {
94-
def from(any: AnyRef): Scannable = new Scannable {
95-
override def jScannable: JScannable = JScannable.from(any)
94+
// def from(any: AnyRef): Scannable = new Scannable {
95+
// override def jScannable: JScannable = JScannable.from(any)
96+
// }
97+
98+
def from(any: Option[AnyRef]): Scannable = {
99+
any match {
100+
case None => JScannable.from(None.orNull)
101+
case Some(s: Scannable) => s.jScannable
102+
case _ => JScannable.from(new Object())
103+
}
96104
}
97105

98106
implicit def JScannable2Scannable(js: JScannable): Scannable = new Scannable {

src/main/scala/reactor/core/scala/publisher/SFlux.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ import java.util.function.{BiFunction, Function, Supplier}
77
import java.util.{Collection => JCollection, List => JList, Map => JMap}
88

99
import org.reactivestreams.{Publisher, Subscriber, Subscription}
10-
import reactor.core.Disposable
10+
import reactor.core.{Disposable, Scannable => JScannable}
1111
import reactor.core.publisher.FluxSink.OverflowStrategy
1212
import reactor.core.publisher.{FluxSink, Signal, SignalType, SynchronousSink, Flux => JFlux, GroupedFlux => JGroupedFlux}
13+
import reactor.core.scala.Scannable
1314
import reactor.core.scala.publisher.PimpMyPublisher._
1415
import reactor.core.scheduler.{Scheduler, Schedulers}
1516
import reactor.util.concurrent.Queues.{SMALL_BUFFER_SIZE, XS_BUFFER_SIZE}
@@ -223,6 +224,8 @@ trait SFlux[T] extends SFluxLike[T, SFlux] with MapablePublisher[T] {
223224

224225
final def mergeWith(other: Publisher[_ <: T]): SFlux[T] = coreFlux.mergeWith(other)
225226

227+
final def name(name: String): SFlux[T] = coreFlux.name(name)
228+
226229
final def nonEmpty: SMono[Boolean] = hasElements
227230

228231
final def onErrorMap(mapper: Throwable => _ <: Throwable): SFlux[T] = coreFlux.onErrorMap(mapper)
@@ -348,6 +351,10 @@ object SFlux {
348351
new ReactiveSFlux[O](JFlux.zip[I, O](combinator, prefetch, sources: _*))
349352
}
350353

351-
private[publisher] class ReactiveSFlux[T](publisher: Publisher[T]) extends SFlux[T] {
352-
override private[publisher] def coreFlux: JFlux[T] = JFlux.from(publisher)
354+
private[publisher] class ReactiveSFlux[T](publisher: Publisher[T]) extends SFlux[T] with Scannable {
355+
override private[publisher] val coreFlux: JFlux[T] = JFlux.from(publisher)
356+
357+
override def scanUnsafe(key: JScannable.Attr[_]): Option[AnyRef] = Option(jScannable.scanUnsafe(key))
358+
359+
override val jScannable: JScannable = JScannable.from(coreFlux)
353360
}

src/test/scala/reactor/core/scala/publisher/SFluxTest.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ import java.util.function.Predicate
1010
import org.reactivestreams.Subscription
1111
import org.scalatest.prop.TableDrivenPropertyChecks
1212
import org.scalatest.{FreeSpec, Matchers}
13-
import reactor.core.publisher.{Flux, _}
13+
import reactor.core.publisher._
14+
import reactor.core.scala.Scannable
1415
import reactor.core.scheduler.Schedulers
1516
import reactor.test.StepVerifier
1617
import reactor.test.scheduler.VirtualTimeScheduler
@@ -1363,6 +1364,13 @@ class SFluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks {
13631364
}
13641365
}
13651366

1367+
".name should call the underlying Flux.name method" in {
1368+
val name = "one two three four"
1369+
val flux = SFlux.just(1, 2, 3, 4).name(name)
1370+
val scannable: Scannable = Scannable.from(Option(flux))
1371+
scannable.name shouldBe name
1372+
}
1373+
13661374
".nonEmpty should return true if this flux has at least one element" in {
13671375
StepVerifier.create(SFlux.just(1, 2, 3).nonEmpty)
13681376
.expectNext(true)

0 commit comments

Comments
 (0)