Skip to content
This repository was archived by the owner on Mar 2, 2022. It is now read-only.

Commit b5b0fcc

Browse files
committed
Fixing #18
1 parent 409a959 commit b5b0fcc

File tree

3 files changed

+24
-11
lines changed

3 files changed

+24
-11
lines changed

src/main/scala/reactor/core/scala/Scannable.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import scala.language.implicitConversions
1010
* Created by winarto on 17/6/17.
1111
*/
1212
trait Scannable {
13-
def jScannable: JScannable
13+
private[scala] def jScannable: JScannable
1414

1515
def actuals(): Stream[_ <: Scannable] = jScannable.actuals().iterator().asScala.map(js => js: Scannable).toStream
1616

@@ -91,8 +91,15 @@ trait Scannable {
9191
}
9292

9393
object Scannable {
94-
def from(any: AnyRef): Scannable = new Scannable {
95-
override def jScannable: JScannable = JScannable.from(any)
94+
def from(any: Option[AnyRef]): Scannable = new Scannable {
95+
override def jScannable: JScannable = {
96+
any match {
97+
case None => JScannable.from(None.orNull)
98+
case Some(s: Scannable) => JScannable.from(s.jScannable)
99+
case Some(js: JScannable) => JScannable.from(js)
100+
case Some(other) => JScannable.from(other)
101+
}
102+
}
96103
}
97104

98105
implicit def JScannable2Scannable(js: JScannable): Scannable = new Scannable {

src/main/scala/reactor/core/scala/publisher/Flux.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import java.util.logging.Level
88
import java.util.{Comparator, stream, List => JList}
99

1010
import org.reactivestreams.{Publisher, Subscriber, Subscription}
11-
import reactor.core.Disposable
11+
import reactor.core
12+
import reactor.core.{Disposable, Scannable => JScannable}
1213
import reactor.core.publisher.FluxSink.OverflowStrategy
1314
import reactor.core.publisher.{BufferOverflowStrategy, FluxSink, Signal, SignalType, SynchronousSink, Flux => JFlux, GroupedFlux => JGroupedFlux}
15+
import reactor.core.scala.Scannable
1416
import reactor.core.scheduler.{Scheduler, Schedulers}
1517
import reactor.util.Logger
1618
import reactor.util.context.Context
@@ -43,7 +45,10 @@ import scala.concurrent.duration.Duration
4345
* @see [[Mono]]
4446
*/
4547
class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
46-
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] {
48+
extends Publisher[T] with MapablePublisher[T] with OnErrorReturn[T] with FluxLike[T] with Filter [T] with Scannable {
49+
50+
override def jScannable: JScannable = JScannable.from(jFlux)
51+
4752
override def subscribe(s: Subscriber[_ >: T]): Unit = jFlux.subscribe(s)
4853

4954
/**
@@ -2104,7 +2109,7 @@ class Flux[T] private[publisher](private[publisher] val jFlux: JFlux[T])
21042109
* @param name a name for the sequence
21052110
* @return the same sequence, but bearing a name
21062111
*/
2107-
final def name(name: String) = Flux(jFlux.name(name))
2112+
final def name(name: String): Flux[T] = Flux(jFlux.name(name))
21082113

21092114
/**
21102115
* Emit only the first item emitted by this [[Flux]].

src/test/scala/reactor/core/scala/publisher/FluxTest.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.reactivestreams.{Publisher, Subscription}
1313
import org.scalatest.prop.TableDrivenPropertyChecks
1414
import org.scalatest.{FreeSpec, Matchers}
1515
import reactor.core.publisher.{Flux => JFlux, _}
16+
import reactor.core.scala.Scannable
1617
import reactor.core.scheduler.Schedulers
1718
import reactor.test.StepVerifier
1819
import reactor.test.scheduler.VirtualTimeScheduler
@@ -1574,10 +1575,10 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
15741575
}
15751576

15761577
".name should call the underlying Flux.name method" in {
1577-
val jFlux = spy(JFlux.just(1, 2, 3))
1578-
val flux = Flux(jFlux)
1579-
flux.name("flux-integer")
1580-
verify(jFlux).name("flux-integer")
1578+
val name = "flux integer"
1579+
val flux = Flux.just(1, 2, 3, 4).name(name)
1580+
val scannable: Scannable = Scannable.from(Option(flux))
1581+
scannable.name shouldBe name
15811582
}
15821583

15831584
".ofType should filter the value emitted by this flux according to the class" in {
@@ -1810,7 +1811,7 @@ class FluxTest extends FreeSpec with Matchers with TableDrivenPropertyChecks wit
18101811
.verifyComplete()
18111812
}
18121813
"with initial value should scan with provided initial value" in {
1813-
val flux = Flux.just(1, 2, 3, 4).scan[Int](2, { (a, b) => a * b })
1814+
val flux = Flux.just[Int](1, 2, 3, 4).scan[Int](2, { (a: Int, b: Int) => a * b })
18141815
StepVerifier.create(flux)
18151816
.expectNext(2, 2, 4, 12, 48)
18161817
.verifyComplete()

0 commit comments

Comments
 (0)