Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

collect methods for ReceiveChannel. #69

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions coroutines-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,35 @@ fun main(args: Array<String>) = runBlocking<Unit> {
Done!
-->

Instead of a `for` loop you can also use convenient extension functions, such as [collectList],
[collectMap], [collectSequence] or [collectSet],
which create a collection of elements received from `ReceiveChannel`:

```kotlin
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}

val list = channel.collectList()
list.forEach { println(it) }
println("Done!")
}
```

> You can get full code [here](kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-10.kt)

<!--- TEST
1
4
9
16
25
Done!
-->

### Building channel producers

The pattern where a coroutine is producing a sequence of elements is quite common.
Expand Down Expand Up @@ -2219,6 +2248,10 @@ Channel was closed
[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
[collectList]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-list.html
[collectMap]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-map.html
[collectSequence]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-sequence.html
[collectSet]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/collect-set.html
[Channel.invoke]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel/invoke.html
[actor]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/actor.html
<!--- INDEX kotlinx.coroutines.experimental.selects -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,42 @@ public suspend fun <E> BroadcastChannel<E>.consumeEach(action: suspend (E) -> Un
for (x in channel) action(x)
}
}

/**
* Collects all received elements into a [List].
*/
public suspend fun <E> ReceiveChannel<E>.collectList(): List<E> {
val list = mutableListOf<E>()

consumeEach { list += it }

return list
}

/**
* Collects all received elements into a [Map] using specified [keyExtractor] to extract key from element.
*/
public suspend fun <K, E> ReceiveChannel<E>.collectMap(keyExtractor: (E) -> K): Map<K, E> {
val map = mutableMapOf<K, E>()

consumeEach { map += keyExtractor(it) to it }

return map
}

/**
* Collects all received elements into a [Sequence].
*/
public suspend fun <E> ReceiveChannel<E>.collectSequence(): Sequence<E> = collectList().asSequence()

/**
* Collects all received elements into a [Set].
*/
public suspend fun <E> ReceiveChannel<E>.collectSet(): Set<E> {
val set = mutableSetOf<E>()

consumeEach { set += it }

return set
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
package guide.channel.example10

import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.*

fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
launch(CommonPool) {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}

val list = channel.collectList()
list.forEach { println(it) }
println("Done!")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kotlinx.coroutines.experimental.channels

import kotlinx.coroutines.experimental.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test

class ChannelsTest {
@Test
fun testEmptyList() = runBlocking {
val channel = Channel<String>()
channel.close()

assertTrue(channel.collectList().isEmpty())
}

@Test
fun testCollectList() = runBlocking {
val values = listOf("A", "B", "F")
val channel = Channel<String>(values.size)
values.forEach {
channel.send(it)
}
channel.close()

assertEquals(channel.collectList(), values)
}

@Test
fun testEmptySet() = runBlocking {
val channel = Channel<String>()
channel.close()

assertTrue(channel.collectSet().isEmpty())
}

@Test
fun testCollectSet() = runBlocking {
val values = setOf("A", "B", "F")
val channel = Channel<String>(values.size)
values.forEach {
channel.send(it)
}
channel.close()

assertEquals(channel.collectSet(), values)
}

@Test
fun testEmptySequence() = runBlocking {
val channel = Channel<String>()
channel.close()

assertTrue(channel.collectSequence().count() == 0)
}

@Test
fun testCollectSequence() = runBlocking {
val values = listOf("A", "B", "F")
val channel = Channel<String>(values.size)
values.forEach {
channel.send(it)
}
channel.close()

assertEquals(channel.collectSequence().toList(), values.toList())
}

@Test
fun testEmptyMap() = runBlocking {
val channel = Channel<String>()
channel.close()

assertTrue(channel.collectMap { it }.isEmpty())
}

@Test
fun testCollectMap() = runBlocking {
val values = mapOf("A" to 1, "B" to 2, "F" to 3)
val channel = Channel<Pair<String, Int>>(values.size)
values.entries.forEach {
channel.send(it.key to it.value)
}
channel.close()

val expected = values.mapValues { (k, v) -> k to v }

assertEquals(expected, channel.collectMap { it.first })
}

}