38
38
import org .reactivestreams .Publisher ;
39
39
import org .reactivestreams .Subscriber ;
40
40
import org .reactivestreams .Subscription ;
41
- import reactor .core .publisher .DirectProcessor ;
42
41
import reactor .core .publisher .Flux ;
43
42
import reactor .core .publisher .Mono ;
44
43
59
58
import java .util .concurrent .Executor ;
60
59
import java .util .concurrent .Flow ;
61
60
import java .util .concurrent .atomic .AtomicBoolean ;
62
- import java .util .concurrent .atomic .AtomicLong ;
63
- import java .util .concurrent .locks .ReentrantLock ;
64
61
import java .util .function .Function ;
65
62
import java .util .function .Supplier ;
66
63
75
72
import static oracle .r2dbc .impl .OracleR2dbcExceptions .runJdbc ;
76
73
import static oracle .r2dbc .impl .OracleR2dbcExceptions .toR2dbcException ;
77
74
import static org .reactivestreams .FlowAdapters .toFlowPublisher ;
78
- import static org .reactivestreams .FlowAdapters .toFlowSubscriber ;
79
75
import static org .reactivestreams .FlowAdapters .toPublisher ;
76
+ import static org .reactivestreams .FlowAdapters .toSubscriber ;
80
77
81
78
/**
82
79
* <p>
@@ -917,28 +914,19 @@ public Publisher<String> publishClobRead(Clob clob)
917
914
* contents are always copied into a new byte array. In a later release,
918
915
* avoiding the copy using {@link ByteBuffer#array()} can be worth
919
916
* considering.
920
- *
921
- * @implNote The 21c {@code OracleBlob} subscriber violates Rule 2.7 of the
922
- * Reactive Streams Specification, which prohibits concurrent calls to
923
- * {@link Subscription#request(long)}. This can cause undefined behavior by
924
- * the {@code contentPublisher}. To work around this bug, this method
925
- * proxies the {@link Subscription} between the {@code contentPublisher}
926
- * and the {@code OracleBlob} subscriber. The proxy ensures that
927
- * {@code request} signals are delivered serially.
928
917
*/
929
918
@ Override
930
919
public Publisher <Void > publishBlobWrite (
931
920
Publisher <ByteBuffer > contentPublisher , Blob blob ) {
932
921
OracleBlob oracleBlob = castAsType (blob , OracleBlob .class );
933
922
934
- // TODO: Move subscriberOracleCall into adaptFlowPublisher, so that it
923
+ // TODO: Move subscriberOracle Call into adaptFlowPublisher, so that it
935
924
// avoids lock contention
936
- // This processor emits a terminal signal when all blob writing database
937
- // calls have completed
938
- DirectProcessor <Long > writeOutcomeProcessor = DirectProcessor . create ();
925
+ // This subscriber receives a terminal signal after JDBC completes the
926
+ // LOB write.
927
+ CompletionSubscriber <Long > outcomeSubscriber = new CompletionSubscriber <> ();
939
928
Flow .Subscriber <byte []> blobSubscriber = fromJdbc (() ->
940
- oracleBlob .subscriberOracle (1L ,
941
- toFlowSubscriber (writeOutcomeProcessor )));
929
+ oracleBlob .subscriberOracle (1L , outcomeSubscriber ));
942
930
943
931
// TODO: Acquire async lock before invoking onNext, release when
944
932
// writeOutcomeProcessor gets onNext with sum equal to sum of buffer
@@ -960,9 +948,10 @@ public Publisher<Void> publishBlobWrite(
960
948
slice .get (byteArray );
961
949
return byteArray ;
962
950
})
963
- .subscribe (new SerializedLobSubscriber <>(blobSubscriber ));
951
+ .subscribe (toSubscriber (blobSubscriber ));
952
+
964
953
965
- return toFlowPublisher (writeOutcomeProcessor . then ());
954
+ return toFlowPublisher (outcomeSubscriber . publish ());
966
955
});
967
956
}
968
957
@@ -974,33 +963,24 @@ public Publisher<Void> publishBlobWrite(
974
963
* {@link OracleClob#subscriberOracle(long, Flow.Subscriber)} adapted to
975
964
* conform with the R2DBC standards.
976
965
* </p>
977
- *
978
- * @implNote The 21c {@code OracleClob} subscriber violates Rule 2.7 of the
979
- * Reactive Streams Specification, which prohibits concurrent calls to
980
- * {@link Subscription#request(long)}. This can cause undefined behavior by
981
- * the {@code contentPublisher}. To work around this bug, this method
982
- * proxies the {@link Subscription} between the {@code contentPublisher}
983
- * and the {@code OracleClob} subscriber. The proxy ensures that
984
- * {@code request} signals are delivered serially.
985
966
*/
986
967
@ Override
987
968
public Publisher <Void > publishClobWrite (
988
969
Publisher <? extends CharSequence > contentPublisher , Clob clob ) {
989
970
OracleClob oracleClob = castAsType (clob , OracleClob .class );
990
971
991
- // This processor emits a terminal signal when all clob writing database
992
- // calls have completed
993
- DirectProcessor <Long > writeOutcomeProcessor = DirectProcessor . create ();
972
+ // This subscriber receives a terminal signal after JDBC completes the
973
+ // LOB write.
974
+ CompletionSubscriber <Long > outcomeSubscriber = new CompletionSubscriber <> ();
994
975
Flow .Subscriber <String > clobSubscriber = fromJdbc (() ->
995
- oracleClob .subscriberOracle (1L ,
996
- toFlowSubscriber (writeOutcomeProcessor )));
976
+ oracleClob .subscriberOracle (1L , outcomeSubscriber ));
997
977
998
978
return adaptFlowPublisher (() -> {
999
979
Flux .from (contentPublisher )
1000
980
.map (CharSequence ::toString )
1001
- .subscribe (new SerializedLobSubscriber <> (clobSubscriber ));
981
+ .subscribe (toSubscriber (clobSubscriber ));
1002
982
1003
- return toFlowPublisher (writeOutcomeProcessor . then ());
983
+ return toFlowPublisher (outcomeSubscriber . publish ());
1004
984
});
1005
985
}
1006
986
@@ -1276,134 +1256,59 @@ else if (isTypeConversionError(sqlException.getErrorCode()))
1276
1256
}
1277
1257
1278
1258
/**
1279
- * <p>
1280
- * A {@code Subscriber} that serializes {@code Subscription} method calls
1281
- * made by {@link OracleBlob} or {@link OracleClob} subscribers. The purpose
1282
- * of this class is to work around Oracle JDBC Bug #32097526, in which the
1283
- * Large Object (LOB) subscribers violate Rule 2.7 of the Reactive Streams
1284
- * 1.0.3 Specification by invoking subscription methods concurrently. This
1285
- * violation can lead to unspecified behavior from the upstream LOB content
1286
- * {@code Publisher}.
1287
- * </p><p>
1288
- * This class serves as an intermediary between a LOB content publisher
1289
- * upstream, and the LOB subscriber downstream. It presents itself as a
1290
- * subscription to the LOB subscriber so that it can regulate it's
1291
- * subscription method calls. Each subscription call is regulated by
1292
- * acquiring a mutually exclusive lock before the call is forwarded to the
1293
- * content publisher's subscription.
1294
- * </p>
1295
- *
1296
- * @implNote This class is an {@code org.reactivestreams.Subscriber} and a
1297
- * {@code java.util.concurrent.Flow.Subscription}. These APIs were chosen to
1298
- * interface with R2DBC Blob/Clob publishers upstream, and with Reactive
1299
- * Extensions downstream.
1300
- * @param <T> The type of item subscribed to
1259
+ * A subscriber that relays {@code onComplete} or {@code onError} signals
1260
+ * from an upstream publisher to downstream subscribers. This subscriber
1261
+ * ignores {@code onNext} signals from an upstream publisher. This subscriber
1262
+ * signals unbounded demand to an upstream publisher.
1263
+ * @param <T> Type of values emitted from an upstream publisher.
1301
1264
*/
1302
- private static class SerializedLobSubscriber <T >
1303
- implements org . reactivestreams . Subscriber <T >, Flow . Subscription {
1265
+ private static final class CompletionSubscriber <T >
1266
+ implements Flow . Subscriber <T > {
1304
1267
1305
- /** The downstream OracleBlob/OracleClob subscriber */
1306
- final Flow .Subscriber <T > lobSubscriber ;
1307
-
1308
- /** Guards access to the upstream content publisher's subscription */
1309
- final ReentrantLock signalLock = new ReentrantLock ();
1310
-
1311
- /** The upstream content publisher's subscription */
1312
- Subscription contentSubscription ;
1268
+ /** Future completed by {@code onSubscribe} */
1269
+ private final CompletableFuture <Flow .Subscription > subscriptionFuture =
1270
+ new CompletableFuture <>();
1313
1271
1314
1272
/**
1315
- * Constructs a new subscriber that regulates subscription calls from a
1316
- * {@code lobSubscriber}. The {@code onSubscribe} method of the {@code
1317
- * lobSubscriber} is invoked when the {@code onSubscribe} method of the
1318
- * constructed subscriber is invoked.
1273
+ * Future completed normally by {@code onComplete}, or exceptionally by
1274
+ * {@code onError}
1319
1275
*/
1320
- SerializedLobSubscriber (Flow .Subscriber <T > lobSubscriber ) {
1321
- this .lobSubscriber = lobSubscriber ;
1322
- }
1276
+ private final CompletableFuture <Void > resultFuture =
1277
+ new CompletableFuture <>();
1323
1278
1324
- /**
1325
- * {@inheritDoc}
1326
- * <p>
1327
- * Retains the {@code subscription} and presents itself as a subscription
1328
- * to the LOB subscriber. Subscription calls from the LOB subscriber are
1329
- * then serially forwarded to the {@code subscription}.
1330
- * </p>
1331
- */
1332
1279
@ Override
1333
- public void onSubscribe (Subscription subscription ) {
1334
- contentSubscription = subscription ;
1335
- lobSubscriber . onSubscribe ( this );
1280
+ public void onSubscribe (Flow . Subscription subscription ) {
1281
+ subscriptionFuture . complete ( Objects . requireNonNull ( subscription )) ;
1282
+ subscription . request ( Long . MAX_VALUE );
1336
1283
}
1337
1284
1338
- /**
1339
- * {@inheritDoc}
1340
- * <p>
1341
- * Regulates a request call from the {@code lobSubscriber} by first
1342
- * blocking until any active {@code request} or {@code cancel} call has
1343
- * completed, and then forwarding the request to the content publisher.
1344
- * </p>
1345
- */
1346
1285
@ Override
1347
- public void request (long n ) {
1348
- signalLock .lock ();
1349
- try {
1350
- contentSubscription .request (n );
1351
- }
1352
- finally {
1353
- signalLock .unlock ();
1354
- }
1355
- }
1356
-
1357
- /**
1358
- * {@inheritDoc}
1359
- * <p>
1360
- * Regulates a cancel call from the {@code lobSubscriber} by first
1361
- * blocking until any active {@code request} or {@code cancel} call has
1362
- * completed, and then forwarding the cancel to the content publisher.
1363
- * </p>
1364
- */
1365
- @ Override
1366
- public void cancel () {
1367
- signalLock .lock ();
1368
- try {
1369
- contentSubscription .cancel ();
1370
- }
1371
- finally {
1372
- signalLock .unlock ();
1373
- }
1286
+ public void onNext (T item ) {
1374
1287
}
1375
1288
1376
- /**
1377
- * {@inheritDoc}
1378
- * <p>
1379
- * Forwards the signal to the LOB subscriber without any regulation.
1380
- * </p>
1381
- */
1382
1289
@ Override
1383
- public void onNext ( T item ) {
1384
- lobSubscriber . onNext ( item );
1290
+ public void onError ( Throwable throwable ) {
1291
+ resultFuture . completeExceptionally ( Objects . requireNonNull ( throwable ) );
1385
1292
}
1386
1293
1387
- /**
1388
- * {@inheritDoc}
1389
- * <p>
1390
- * Forwards the signal to the LOB subscriber without any regulation.
1391
- * </p>
1392
- */
1393
1294
@ Override
1394
- public void onError ( Throwable throwable ) {
1395
- lobSubscriber . onError ( throwable );
1295
+ public void onComplete ( ) {
1296
+ resultFuture . complete ( null );
1396
1297
}
1397
1298
1398
1299
/**
1399
- * {@inheritDoc}
1400
- * <p>
1401
- * Forwards the signal to the LOB subscriber without any regulation.
1402
- * </p>
1300
+ * Returns a publisher that emits the same {@code onComplete} or
1301
+ * {@code onError} signal emitted to this subscriber. Cancelling a
1302
+ * subscription to the returned publisher cancels the subscription of this
1303
+ * subscriber.
1304
+ * @return A publisher that emits the terminal signal emitted to this
1305
+ * subscriber.
1403
1306
*/
1404
- @ Override
1405
- public void onComplete () {
1406
- lobSubscriber .onComplete ();
1307
+ Publisher <Void > publish () {
1308
+ return Mono .fromCompletionStage (resultFuture )
1309
+ .doOnCancel (() ->
1310
+ subscriptionFuture .thenAccept (Flow .Subscription ::cancel ));
1407
1311
}
1408
1312
}
1313
+
1409
1314
}
0 commit comments