Skip to content

Commit 3758661

Browse files
authored
Improve the toCollection and related API documentation (#1677)
JAVA-5833
1 parent 8399271 commit 3758661

File tree

13 files changed

+217
-24
lines changed

13 files changed

+217
-24
lines changed

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlow.kt

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
6262
public fun timeoutMode(timeoutMode: TimeoutMode): AggregateFlow<T> = apply { wrapped.timeoutMode(timeoutMode) }
6363

6464
/**
65-
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out or $merge stage.
65+
* Aggregates documents according to the specified aggregation pipeline, which must end with an `$out` or `$merge`
66+
* stage. Calling this method is the preferred alternative to consuming this [AggregateFlow], because this method
67+
* does what is explicitly requested without executing implicit operations.
6668
*
67-
* @throws IllegalStateException if the pipeline does not end with a $out or $merge stage
69+
* @throws IllegalStateException if the pipeline does not end with an `$out` or `$merge` stage
6870
* @see [$out stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/out/)
6971
* @see [$merge stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/)
7072
*/
@@ -214,5 +216,11 @@ public class AggregateFlow<T : Any>(private val wrapped: AggregatePublisher<T>)
214216
public suspend inline fun <reified R : Any> explain(verbosity: ExplainVerbosity? = null): R =
215217
explain(R::class.java, verbosity)
216218

219+
/**
220+
* Requests [AggregateFlow] to start streaming data according to the specified aggregation pipeline.
221+
* - If the aggregation pipeline ends with an `$out` or `$merge` stage, then finds all documents in the affected
222+
* namespace and emits them. You may want to use [toCollection] instead.
223+
* - Otherwise, emits no values.
224+
*/
217225
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
218226
}

driver-kotlin-coroutine/src/main/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlow.kt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import org.bson.conversions.Bson
3333
/**
3434
* Flow implementation for map reduce operations.
3535
*
36+
* By default, the [MapReduceFlow] emits the results inline. You can write map-reduce output to a collection by using
37+
* the [collectionName] and [toCollection] methods.
38+
*
3639
* Note: Starting in MongoDB 5.0, map-reduce is deprecated, prefer Aggregation instead
3740
*
3841
* @param T The type of the result.
@@ -65,9 +68,11 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
6568

6669
/**
6770
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which
68-
* must specify a non-inline result.
71+
* must not emit results inline. Calling this method is the preferred alternative to consuming this [MapReduceFlow],
72+
* because this method does what is explicitly requested without executing implicit operations.
6973
*
7074
* @throws IllegalStateException if a collection name to write the results to has not been specified
75+
* @see collectionName
7176
*/
7277
public suspend fun toCollection() {
7378
wrapped.toCollection().awaitFirstOrNull()
@@ -80,6 +85,7 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
8085
*
8186
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
8287
* @return this
88+
* @see toCollection
8389
*/
8490
public fun collectionName(collectionName: String): MapReduceFlow<T> = apply {
8591
wrapped.collectionName(collectionName)
@@ -205,5 +211,12 @@ public class MapReduceFlow<T : Any>(private val wrapped: MapReducePublisher<T>)
205211
*/
206212
public fun collation(collation: Collation?): MapReduceFlow<T> = apply { wrapped.collation(collation) }
207213

214+
/**
215+
* Requests [MapReduceFlow] to start streaming data according to the specified map-reduce function with the given
216+
* options.
217+
* - If the aggregation produces results inline, then finds all documents in the affected namespace and emits them.
218+
* You may want to use [toCollection] instead.
219+
* - Otherwise, emits no values.
220+
*/
208221
public override suspend fun collect(collector: FlowCollector<T>): Unit = wrapped.asFlow().collect(collector)
209222
}

driver-kotlin-coroutine/src/test/kotlin/com/mongodb/kotlin/client/coroutine/AggregateFlowTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ class AggregateFlowTest {
3939

4040
@Test
4141
fun shouldHaveTheSameMethods() {
42-
val jAggregatePublisherFunctions = AggregatePublisher::class.declaredFunctions.map { it.name }.toSet() - "first"
42+
val jAggregatePublisherFunctions =
43+
AggregatePublisher::class.declaredFunctions.map { it.name }.toSet() - "first" - "subscribe"
4344
val kAggregateFlowFunctions = AggregateFlow::class.declaredFunctions.map { it.name }.toSet() - "collect"
4445

4546
assertEquals(jAggregatePublisherFunctions, kAggregateFlowFunctions)

driver-kotlin-coroutine/src/test/kotlin/com/mongodb/kotlin/client/coroutine/MapReduceFlowTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ import reactor.core.publisher.Mono
3838
class MapReduceFlowTest {
3939
@Test
4040
fun shouldHaveTheSameMethods() {
41-
val jMapReducePublisherFunctions = MapReducePublisher::class.declaredFunctions.map { it.name }.toSet() - "first"
41+
val jMapReducePublisherFunctions =
42+
MapReducePublisher::class.declaredFunctions.map { it.name }.toSet() - "first" - "subscribe"
4243
val kMapReduceFlowFunctions = MapReduceFlow::class.declaredFunctions.map { it.name }.toSet() - "collect"
4344

4445
assertEquals(jMapReducePublisherFunctions, kMapReduceFlowFunctions)

driver-kotlin-sync/src/main/kotlin/com/mongodb/kotlin/client/AggregateIterable.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,24 @@ public class AggregateIterable<T : Any>(private val wrapped: JAggregateIterable<
6161
}
6262

6363
/**
64-
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out or $merge stage.
64+
* Aggregates documents according to the specified aggregation pipeline, which must end with an `$out` or `$merge`
65+
* stage. This method is the preferred alternative to [cursor], because this method does what is explicitly
66+
* requested without executing implicit operations.
6567
*
66-
* @throws IllegalStateException if the pipeline does not end with a $out or $merge stage
68+
* @throws IllegalStateException if the pipeline does not end with an `$out` or `$merge` stage
6769
* @see [$out stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/out/)
6870
* @see [$merge stage](https://www.mongodb.com/docs/manual/reference/operator/aggregation/merge/)
6971
*/
7072
public fun toCollection(): Unit = wrapped.toCollection()
7173

74+
/**
75+
* Aggregates documents according to the specified aggregation pipeline.
76+
* - If the aggregation pipeline ends with an `$out` or `$merge` stage, then finds all documents in the affected
77+
* namespace and returns a [MongoCursor] over them. You may want to use [toCollection] instead.
78+
* - Otherwise, returns a [MongoCursor] producing no elements.
79+
*/
80+
public override fun cursor(): MongoCursor<T> = super.cursor()
81+
7282
/**
7383
* Enables writing to temporary files. A null value indicates that it's unspecified.
7484
*

driver-kotlin-sync/src/test/kotlin/com/mongodb/kotlin/client/AggregateIterableTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class AggregateIterableTest {
3737

3838
@Test
3939
fun shouldHaveTheSameMethods() {
40-
val jAggregateIterableFunctions = JAggregateIterable::class.declaredFunctions.map { it.name }.toSet()
40+
val jAggregateIterableFunctions =
41+
JAggregateIterable::class.declaredFunctions.map { it.name }.toSet() - "iterator"
4142
val kAggregateIterableFunctions = AggregateIterable::class.declaredFunctions.map { it.name }.toSet()
4243

4344
assertEquals(jAggregateIterableFunctions, kAggregateIterableFunctions)

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/AggregatePublisher.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717
package com.mongodb.reactivestreams.client;
1818

1919
import com.mongodb.ExplainVerbosity;
20+
import com.mongodb.MongoNamespace;
2021
import com.mongodb.annotations.Alpha;
2122
import com.mongodb.annotations.Reason;
2223
import com.mongodb.client.cursor.TimeoutMode;
24+
import com.mongodb.client.model.Aggregates;
2325
import com.mongodb.client.model.Collation;
26+
import com.mongodb.client.model.MergeOptions;
2427
import com.mongodb.lang.Nullable;
2528
import org.bson.BsonValue;
2629
import org.bson.Document;
2730
import org.bson.conversions.Bson;
2831
import org.reactivestreams.Publisher;
32+
import org.reactivestreams.Subscriber;
2933

3034
import java.util.concurrent.TimeUnit;
3135

@@ -83,13 +87,32 @@ public interface AggregatePublisher<TResult> extends Publisher<TResult> {
8387
AggregatePublisher<TResult> bypassDocumentValidation(@Nullable Boolean bypassDocumentValidation);
8488

8589
/**
86-
* Aggregates documents according to the specified aggregation pipeline, which must end with a $out stage.
90+
* Aggregates documents according to the specified aggregation pipeline, which must end with an
91+
* {@link Aggregates#out(String, String) $out} or {@link Aggregates#merge(MongoNamespace, MergeOptions) $merge} stage.
92+
* Calling this method and then {@linkplain Publisher#subscribe(Subscriber) subscribing} to the returned {@link Publisher}
93+
* is the preferred alternative to {@linkplain #subscribe(Subscriber) subscribing} to this {@link AggregatePublisher},
94+
* because this method does what is explicitly requested without executing implicit operations.
8795
*
96+
* @throws IllegalStateException if the pipeline does not end with an {@code $out} or {@code $merge} stage
8897
* @return an empty publisher that indicates when the operation has completed
8998
* @mongodb.driver.manual aggregation/ Aggregation
9099
*/
91100
Publisher<Void> toCollection();
92101

102+
/**
103+
* Requests {@link AggregatePublisher} to start streaming data according to the specified aggregation pipeline.
104+
* <ul>
105+
* <li>
106+
* If the aggregation pipeline ends with an {@link Aggregates#out(String, String) $out} or
107+
* {@link Aggregates#merge(MongoNamespace, MergeOptions) $merge} stage,
108+
* then {@linkplain MongoCollection#find() finds all} documents in the affected namespace and produces them.
109+
* You may want to use {@link #toCollection()} instead.</li>
110+
* <li>
111+
* Otherwise, produces no elements.</li>
112+
* </ul>
113+
*/
114+
void subscribe(Subscriber<? super TResult> s);
115+
93116
/**
94117
* Sets the collation options
95118
*

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MapReducePublisher.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,22 @@
1616

1717
package com.mongodb.reactivestreams.client;
1818

19-
2019
import com.mongodb.annotations.Alpha;
2120
import com.mongodb.annotations.Reason;
2221
import com.mongodb.client.cursor.TimeoutMode;
2322
import com.mongodb.client.model.Collation;
2423
import com.mongodb.lang.Nullable;
2524
import org.bson.conversions.Bson;
2625
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
2727

2828
import java.util.concurrent.TimeUnit;
2929

3030
/**
3131
* Publisher for map reduce.
32+
* <p>
33+
* By default, the {@code MapReducePublisher} produces the results inline. You can write map-reduce output to a collection by using the
34+
* {@link #collectionName(String)} and {@link #toCollection()} methods.</p>
3235
*
3336
* @param <TResult> The type of the result.
3437
* @since 1.0
@@ -44,6 +47,7 @@ public interface MapReducePublisher<TResult> extends Publisher<TResult> {
4447
*
4548
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
4649
* @return this
50+
* @see #toCollection()
4751
*/
4852
MapReducePublisher<TResult> collectionName(String collectionName);
4953

@@ -152,14 +156,30 @@ public interface MapReducePublisher<TResult> extends Publisher<TResult> {
152156
MapReducePublisher<TResult> bypassDocumentValidation(@Nullable Boolean bypassDocumentValidation);
153157

154158
/**
155-
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must specify a
156-
* non-inline result.
159+
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must not produce
160+
* results inline. Calling this method and then {@linkplain Publisher#subscribe(Subscriber) subscribing} to the returned
161+
* {@link Publisher} is the preferred alternative to {@linkplain #subscribe(Subscriber) subscribing} to this {@link MapReducePublisher},
162+
* because this method does what is explicitly requested without executing implicit operations.
157163
*
158164
* @return an empty publisher that indicates when the operation has completed
165+
* @throws IllegalStateException if a {@linkplain #collectionName(String) collection name} to write the results to has not been specified
166+
* @see #collectionName(String)
159167
* @mongodb.driver.manual aggregation/ Aggregation
160168
*/
161169
Publisher<Void> toCollection();
162170

171+
/**
172+
* Requests {@link MapReducePublisher} to start streaming data according to the specified map-reduce function with the given options.
173+
* <ul>
174+
* <li>
175+
* If the aggregation produces results inline, then {@linkplain MongoCollection#find() finds all} documents in the
176+
* affected namespace and produces them. You may want to use {@link #toCollection()} instead.</li>
177+
* <li>
178+
* Otherwise, produces no elements.</li>
179+
* </ul>
180+
*/
181+
void subscribe(Subscriber<? super TResult> s);
182+
163183
/**
164184
* Sets the collation options
165185
*

driver-scala/src/main/scala/org/mongodb/scala/AggregateObservable.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.mongodb.scala.bson.BsonValue
2525
import org.mongodb.scala.bson.DefaultHelper.DefaultsTo
2626
import org.mongodb.scala.bson.conversions.Bson
2727
import org.mongodb.scala.model.Collation
28+
import org.reactivestreams.Subscriber
2829

2930
import scala.concurrent.duration.Duration
3031
import scala.reflect.ClassTag
@@ -192,9 +193,14 @@ case class AggregateObservable[TResult](private val wrapped: AggregatePublisher[
192193
}
193194

194195
/**
195-
* Aggregates documents according to the specified aggregation pipeline, which must end with a `\$out` stage.
196+
* Aggregates documents according to the specified aggregation pipeline, which must end with an `\$out` or `\$merge` stage.
197+
* Calling this method and then `subscribing` to the returned [[SingleObservable]]
198+
* is the preferred alternative to subscribing to this [[AggregateObservable]],
199+
* because this method does what is explicitly requested without executing implicit operations.
196200
*
197201
* [[https://www.mongodb.com/docs/manual/aggregation/ Aggregation]]
202+
*
203+
* @throws java.lang.IllegalStateException if the pipeline does not end with an `\$out` or `\$merge` stage
198204
* @return an Observable that indicates when the operation has completed.
199205
*/
200206
def toCollection(): SingleObservable[Unit] = wrapped.toCollection()
@@ -257,5 +263,23 @@ case class AggregateObservable[TResult](private val wrapped: AggregatePublisher[
257263
)(implicit e: ExplainResult DefaultsTo Document, ct: ClassTag[ExplainResult]): SingleObservable[ExplainResult] =
258264
wrapped.explain[ExplainResult](ct, verbosity)
259265

266+
/**
267+
* Requests [[AggregateObservable]] to start streaming data according to the specified aggregation pipeline.
268+
*
269+
* - If the aggregation pipeline ends with an `\$out` or `\$merge` stage,
270+
* then finds all documents in the affected namespace and produces them.
271+
* You may want to use [[toCollection]] instead.
272+
* - Otherwise, produces no elements.
273+
*/
260274
override def subscribe(observer: Observer[_ >: TResult]): Unit = wrapped.subscribe(observer)
275+
276+
/**
277+
* Requests [[AggregateObservable]] to start streaming data according to the specified aggregation pipeline.
278+
*
279+
* - If the aggregation pipeline ends with an `\$out` or `\$merge` stage,
280+
* then finds all documents in the affected namespace and produces them.
281+
* You may want to use [[toCollection]] instead.
282+
* - Otherwise, produces no elements.
283+
*/
284+
override def subscribe(observer: Subscriber[_ >: TResult]): Unit = wrapped.subscribe(observer)
261285
}

driver-scala/src/main/scala/org/mongodb/scala/MapReduceObservable.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@ import com.mongodb.client.model.MapReduceAction
2323
import com.mongodb.reactivestreams.client.MapReducePublisher
2424
import org.mongodb.scala.bson.conversions.Bson
2525
import org.mongodb.scala.model.Collation
26+
import org.reactivestreams.Subscriber
2627

2728
import scala.concurrent.duration.Duration
2829

2930
/**
3031
* Observable for map reduce.
3132
*
33+
* By default, the [[MapReduceObservable]] produces the results inline. You can write map-reduce output to a collection by using the
34+
* [[collectionName]] and [[toCollection]] methods.
35+
*
3236
* @define docsRef https://www.mongodb.com/docs/manual/reference
3337
*
3438
* @tparam TResult The type of the result.
@@ -44,6 +48,7 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
4448
*
4549
* @param collectionName the name of the collection that you want the map-reduce operation to write its output.
4650
* @return this
51+
* @see [[toCollection]]
4752
*/
4853
def collectionName(collectionName: String): MapReduceObservable[TResult] = {
4954
wrapped.collectionName(collectionName)
@@ -214,11 +219,15 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
214219
}
215220

216221
/**
217-
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must specify a
218-
* non-inline result.
222+
* Aggregates documents to a collection according to the specified map-reduce function with the given options, which must not produce
223+
* results inline. Calling this method and then subscribing to the returned [[SingleObservable]] is the preferred alternative to
224+
* subscribing to this [[MapReduceObservable]],
225+
* because this method does what is explicitly requested without executing implicit operations.
219226
*
220227
* @return an Observable that indicates when the operation has completed
221228
* [[https://www.mongodb.com/docs/manual/aggregation/ Aggregation]]
229+
* @throws java.lang.IllegalStateException if a collection name to write the results to has not been specified
230+
* @see [[collectionName]]
222231
*/
223232
def toCollection(): SingleObservable[Unit] = wrapped.toCollection()
224233

@@ -246,5 +255,21 @@ case class MapReduceObservable[TResult](wrapped: MapReducePublisher[TResult]) ex
246255
*/
247256
def first(): SingleObservable[TResult] = wrapped.first()
248257

258+
/**
259+
* Requests [[MapReduceObservable]] to start streaming data according to the specified map-reduce function with the given options.
260+
*
261+
* - If the aggregation produces results inline, then finds all documents in the
262+
* affected namespace and produces them. You may want to use [[toCollection]] instead.
263+
* - Otherwise, produces no elements.
264+
*/
249265
override def subscribe(observer: Observer[_ >: TResult]): Unit = wrapped.subscribe(observer)
266+
267+
/**
268+
* Requests [[MapReduceObservable]] to start streaming data according to the specified map-reduce function with the given options.
269+
*
270+
* - If the aggregation produces results inline, then finds all documents in the
271+
* affected namespace and produces them. You may want to use [[toCollection]] instead.
272+
* - Otherwise, produces no elements.
273+
*/
274+
override def subscribe(observer: Subscriber[_ >: TResult]): Unit = wrapped.subscribe(observer)
250275
}

0 commit comments

Comments
 (0)