Skip to content

Commit

Permalink
Introduced LifecycleExtensions overloads to set observation schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
LachlanMcKee committed Feb 8, 2023
1 parent 97b3a7c commit f6fddd4
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 4 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@

#### Additions

([#179](https://github.com/badoo/MVICore/pull/179)):
Introduced the ability to specify observation scheduler within the `Binder` class (see the `observeOn` DSL below), as well as the `observeOn` infix operator for `Connection` class (related to `Binder`).

An example of how you might use this is as follows:

```kotlin
testLifecycleOwner.lifecycle.createDestroy {
observeOn(mainScheduler) {
bind(events to uiConsumer1)
bind(events to uiConsumer2)
}
observeOn(backgroundScheduler) {
bind(events to backgroundConsumer1)
bind(events to backgroundConsumer2)
}
bind(events to uiConsumer3 observeOn mainScheduler)
bind(events to backgroundConsumer3 observeOn backgroundScheduler)
}
```

See more details in [advanced binder](../binder/binder-advanced/#setting-connections-observation-scheduler) section.

([#177](https://github.com/badoo/MVICore/pull/177)):
Updated AndroidX appcompat to 1.4.1 and lifecycle to 2.5.1. Also updated Compile and Target SDK to API 33.

Expand Down
39 changes: 37 additions & 2 deletions binder/src/main/java/com/badoo/binder/Binder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import com.badoo.binder.middleware.wrapWithMiddleware
import io.reactivex.Observable
import io.reactivex.Observable.wrap
import io.reactivex.ObservableSource
import io.reactivex.Scheduler
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.functions.Consumer
import io.reactivex.rxkotlin.plusAssign

class Binder(
private val lifecycle: Lifecycle? = null
private val lifecycle: Lifecycle? = null,
) : Disposable {
private val disposables = CompositeDisposable()
private val connections = mutableListOf<Pair<Connection<*, *>, Middleware<*, *>?>>()
Expand Down Expand Up @@ -85,12 +86,14 @@ class Binder(
middleware.onBind(connection)

this
.optionalObserveOn(connection.scheduler)
.doOnNext { middleware.onElement(connection, it) }
.doFinally { middleware.onComplete(connection) }
.subscribe(middleware)

} else {
subscribe(connection.to)
optionalObserveOn(connection.scheduler)
.subscribe(connection.to)
}
}

Expand Down Expand Up @@ -142,4 +145,36 @@ class Binder(
fun clear() {
disposables.clear()
}

private fun <T> Observable<T>.optionalObserveOn(scheduler: Scheduler?) =
if (scheduler != null) {
observeOn(scheduler)
} else {
this
}

fun observeOn(scheduler: Scheduler, f: BinderSchedulerWrapper.() -> Unit) {
BinderSchedulerWrapper(this, scheduler).apply(f)
}

class BinderSchedulerWrapper(private val binder: Binder, private val scheduler: Scheduler?) {
fun <T : Any> bind(connection: Pair<ObservableSource<out T>, Consumer<in T>>) {
binder.bind(
Connection(
from = connection.first,
to = connection.second,
connector = null,
scheduler = scheduler
)
)
}

fun <Out : Any, In : Any> bind(connection: Connection<Out, In>) {
if (scheduler != null) {
binder.bind(connection.observeOn(scheduler))
} else {
binder.bind(connection)
}
}
}
}
16 changes: 15 additions & 1 deletion binder/src/main/java/com/badoo/binder/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package com.badoo.binder
import com.badoo.binder.connector.Connector
import com.badoo.binder.connector.NotNullConnector
import io.reactivex.ObservableSource
import io.reactivex.Scheduler
import io.reactivex.functions.Consumer

data class Connection<Out, In>(
val from: ObservableSource<out Out>? = null,
val to: Consumer<in In>,
val connector: Connector<Out, In>? = null,
val name: String? = null
val name: String? = null,
val scheduler: Scheduler? = null,
) {
companion object {
private const val ANONYMOUS: String = "anonymous"
Expand Down Expand Up @@ -39,7 +41,19 @@ infix fun <T> Pair<ObservableSource<out T>, Consumer<in T>>.named(name: String):
name = name
)

infix fun <T> Pair<ObservableSource<out T>, Consumer<in T>>.observeOn(scheduler: Scheduler): Connection<T, T> =
Connection(
from = first,
to = second,
scheduler = scheduler
)

infix fun <Out, In> Connection<Out, In>.named(name: String) =
copy(
name = name
)

infix fun <Out, In> Connection<Out, In>.observeOn(scheduler: Scheduler) =
copy(
scheduler = scheduler
)
19 changes: 18 additions & 1 deletion documentation/binder/binder-advanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ binder.bind(output to input using OutputToInput)

## Naming connections

And you can optionally give names to any connection:
You can optionally give names to any connection:
```kotlin
binder.bind(input to output named "MyConnection")
// or
Expand All @@ -49,3 +49,20 @@ Naming a connection signals that it's important to you. This will make more sens
- You'll see connections with their respective names in the time-travel debug menu
- You'll see connection names in logs if you use LoggingMiddleware
- You can opt to dynamically add `Middlewares` only to named connections (if that's what you want)

## Setting connections observation scheduler

You can optionally set the observation scheduler for any connection:
```kotlin
binder.bind(input to output observeOn scheduler)
```

You can also use `Binder.observeOn` to reduce repetition:
```kotlin
binder.observeOn(scheduler) {
bind(input1 to output1)
bind(input2 to output2)
}
```

Specifying an observation scheduler ensures that the output is called on the specified scheduler.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,30 @@ package com.badoo.mvicore.android.lifecycle
import androidx.arch.core.executor.ArchTaskExecutor
import androidx.arch.core.executor.TaskExecutor
import androidx.lifecycle.Lifecycle
import com.badoo.binder.observeOn
import io.reactivex.functions.Consumer
import io.reactivex.internal.schedulers.RxThreadFactory
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.subjects.PublishSubject
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import kotlin.test.assertEquals

class LifecycleExtensionsTest {
private val subject = PublishSubject.create<Unit>()
private val consumerTester = ConsumerTester()
private val testLifecycleOwner = TestLifecycleOwner()

private val mainScheduler = RxJavaPlugins
.createSingleScheduler(RxThreadFactory("main", Thread.NORM_PRIORITY, false))
.apply { start() }

private val backgroundScheduler = RxJavaPlugins
.createSingleScheduler(RxThreadFactory("background", Thread.NORM_PRIORITY, false))
.apply { start() }

@Before
fun setup() {
ArchTaskExecutor.getInstance()
Expand Down Expand Up @@ -225,11 +237,89 @@ class LifecycleExtensionsTest {
assertEquals(false, subject.hasObservers())
}

@Test
fun `GIVEN initial lifecycle is created AND createDestroy with observe on schedulers dsl WHEN event emitted THEN verify correct thread called`() {
val testThreadName = Thread.currentThread().name

val subject = PublishSubject.create<Unit>()
val mainThreadConsumerTester = ConsumerTester()
val backgroundThreadConsumerTester = ConsumerTester()
val unconfinedThreadConsumerTester = ConsumerTester()

val countDownLatch = CountDownLatch(3)

testLifecycleOwner.lifecycle.createDestroy {
observeOn(mainScheduler) {
bind(subject to Consumer {
mainThreadConsumerTester.accept(Unit)
countDownLatch.countDown()
})
}
observeOn(backgroundScheduler) {
bind(subject to Consumer {
backgroundThreadConsumerTester.accept(Unit)
countDownLatch.countDown()
})
}
bind(subject to Consumer {
unconfinedThreadConsumerTester.accept(Unit)
countDownLatch.countDown()
})
}
testLifecycleOwner.state = Lifecycle.State.CREATED

subject.onNext(Unit)

countDownLatch.await()

mainThreadConsumerTester.verifyThreadName("main")
backgroundThreadConsumerTester.verifyThreadName("background")
unconfinedThreadConsumerTester.verifyThreadName(testThreadName)
}

@Test
fun `GIVEN initial lifecycle is created AND createDestroy with schedulers infix WHEN event emitted THEN verify correct thread called`() {
val testThreadName = Thread.currentThread().name

val subject = PublishSubject.create<Unit>()
val mainThreadConsumerTester = ConsumerTester()
val backgroundThreadConsumerTester = ConsumerTester()
val unconfinedThreadConsumerTester = ConsumerTester()

val countDownLatch = CountDownLatch(3)

testLifecycleOwner.lifecycle.createDestroy {
bind(subject to Consumer<Unit> {
mainThreadConsumerTester.accept(Unit)
countDownLatch.countDown()
} observeOn mainScheduler)
bind(subject to Consumer<Unit> {
backgroundThreadConsumerTester.accept(Unit)
countDownLatch.countDown()
} observeOn backgroundScheduler)
bind(subject to Consumer {
unconfinedThreadConsumerTester.accept(Unit)
countDownLatch.countDown()
})
}
testLifecycleOwner.state = Lifecycle.State.CREATED

subject.onNext(Unit)

countDownLatch.await()

mainThreadConsumerTester.verifyThreadName("main")
backgroundThreadConsumerTester.verifyThreadName("background")
unconfinedThreadConsumerTester.verifyThreadName(testThreadName)
}

private class ConsumerTester : Consumer<Unit> {
private var wasCalled: Boolean = false
lateinit var threadName: String

override fun accept(t: Unit?) {
wasCalled = true
threadName = Thread.currentThread().name
}

fun verifyInvoked() {
Expand All @@ -239,5 +329,9 @@ class LifecycleExtensionsTest {
fun verifyNotInvoked() {
assertEquals(false, wasCalled)
}

fun verifyThreadName(name: String) {
assertEquals(true, threadName.startsWith(name))
}
}
}

0 comments on commit f6fddd4

Please # to comment.