Skip to content

Commit 2cfdd28

Browse files
Merge branch 'main' of github.com:oracle/oracle-r2dbc into 84-option-publisher
2 parents 40d69a9 + da33589 commit 2cfdd28

File tree

6 files changed

+139
-17
lines changed

6 files changed

+139
-17
lines changed

src/main/java/oracle/r2dbc/OracleR2dbcOptions.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@ private OracleR2dbcOptions() {}
5858
* ...
5959
* .build();
6060
* </pre>
61-
* If this option is not configured, then Oracle R2DBC will use
62-
* {@code ForkJoinPool}'s
63-
* {@linkplain ForkJoinPool#commonPool() common pool} by default.
61+
* If this option is not configured, then Oracle R2DBC will
62+
* use the {@linkplain ForkJoinPool#commonPool() common ForkJoinPool} by
63+
* default. However, if the common {@code ForkJoinPool} has a maximum pool
64+
* size that is potentially zero, then a single-threaded {@code Executor} will
65+
* be used by default.
6466
*/
6567
public static final Option<Executor> EXECUTOR;
6668

@@ -370,6 +372,30 @@ private OracleR2dbcOptions() {}
370372
*/
371373
public static final Option<CharSequence> NET_ENCRYPTION_TYPES;
372374

375+
/**
376+
* Configures the Oracle JDBC Connection used by Oracle R2DBC as specified by:
377+
* {@link OracleConnection#CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB5_CC_NAME}
378+
*/
379+
public static final Option<CharSequence> KERBEROS_CREDENTIAL_CACHE_NAME;
380+
381+
/**
382+
* Configures the Oracle JDBC Connection used by Oracle R2DBC as specified by:
383+
* {@link OracleConnection#CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB5_MUTUAL}
384+
*/
385+
public static final Option<CharSequence> KERBEROS_MUTUAL_AUTHENTICATION;
386+
387+
/**
388+
* Configures the Oracle JDBC Connection used by Oracle R2DBC as specified by:
389+
* {@link OracleConnection#CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB_REALM}
390+
*/
391+
public static final Option<CharSequence> KERBEROS_REALM;
392+
393+
/**
394+
* Configures the Oracle JDBC Connection used by Oracle R2DBC as specified by:
395+
* {@link OracleConnection#CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB_JAAS_LOGIN_MODULE}
396+
*/
397+
public static final Option<CharSequence> KERBEROS_JAAS_LOGIN_MODULE;
398+
373399

374400
/** The unmodifiable set of all extended options */
375401
private static final Set<Option<?>> OPTIONS = Set.of(
@@ -476,7 +502,15 @@ private OracleR2dbcOptions() {}
476502
NET_ENCRYPTION_LEVEL = Option.valueOf(
477503
OracleConnection.CONNECTION_PROPERTY_THIN_NET_ENCRYPTION_LEVEL),
478504
NET_ENCRYPTION_TYPES = Option.valueOf(
479-
OracleConnection.CONNECTION_PROPERTY_THIN_NET_ENCRYPTION_TYPES)
505+
OracleConnection.CONNECTION_PROPERTY_THIN_NET_ENCRYPTION_TYPES),
506+
KERBEROS_CREDENTIAL_CACHE_NAME = Option.valueOf(
507+
OracleConnection.CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB5_CC_NAME),
508+
KERBEROS_MUTUAL_AUTHENTICATION = Option.valueOf(
509+
OracleConnection.CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB5_MUTUAL),
510+
KERBEROS_REALM = Option.valueOf(
511+
OracleConnection.CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB_REALM),
512+
KERBEROS_JAAS_LOGIN_MODULE = Option.valueOf(
513+
OracleConnection.CONNECTION_PROPERTY_THIN_NET_AUTHENTICATION_KRB_JAAS_LOGIN_MODULE)
480514
);
481515

482516
/**

src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,20 @@
102102
*/
103103
final class OracleConnectionFactoryImpl implements ConnectionFactory {
104104

105+
/**
106+
* <p>
107+
* The default executor when {@link OracleR2dbcOptions#EXECUTOR} is not
108+
* configured. It will use the common {@code ForkJoinPool}, unless it has
109+
* a maximum pool size of 0. See:
110+
* https://github.com/oracle/oracle-r2dbc/issues/129
111+
* </p>
112+
*/
113+
private static final Executor DEFAULT_EXECUTOR =
114+
"0".equals(System.getProperty(
115+
"java.util.concurrent.ForkJoinPool.common.parallelism"))
116+
? new ForkJoinPool(1)
117+
: ForkJoinPool.commonPool();
118+
105119
/** JDBC data source that this factory uses to open connections */
106120
private final DataSource dataSource;
107121

@@ -200,7 +214,7 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
200214

201215
Object executor = options.getValue(OracleR2dbcOptions.EXECUTOR);
202216
if (executor == null) {
203-
this.executor = ForkJoinPool.commonPool();
217+
this.executor = DEFAULT_EXECUTOR;
204218
}
205219
else if (executor instanceof Executor) {
206220
this.executor = (Executor) executor;
@@ -267,4 +281,5 @@ public Publisher<Connection> create() {
267281
public ConnectionFactoryMetadata getMetadata() {
268282
return OracleConnectionFactoryMetadataImpl.INSTANCE;
269283
}
284+
270285
}

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -925,14 +925,11 @@ void addDependent() {
925925
protected final <T extends Segment, U> Publisher<U> mapSegments(
926926
Class<T> segmentType, Function<? super T, U> segmentMapper) {
927927

928-
@SuppressWarnings("unchecked")
929-
Publisher<U> removeDependent = (Publisher<U>) dependentCounter.decrement();
928+
Publisher<Void> removeDependent = dependentCounter.decrement();
930929

931-
return Flux.concatDelayError(
930+
return Publishers.concatTerminal(
932931
mapDependentSegments(segmentType, segmentMapper),
933-
removeDependent)
934-
.doOnCancel(() ->
935-
Mono.from(removeDependent).subscribe());
932+
removeDependent);
936933
}
937934

938935
/**

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,18 +1026,16 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
10261026
*/
10271027
final Publisher<OracleResultImpl> execute() {
10281028

1029-
Mono<OracleResultImpl> deallocate =
1030-
Mono.from(deallocate()).cast(OracleResultImpl.class);
1029+
Publisher<Void> deallocate = deallocate();
10311030

1032-
return Flux.concatDelayError(
1031+
return Publishers.concatTerminal(
10331032
Mono.from(bind())
10341033
.thenMany(executeJdbc())
10351034
.map(this::getWarnings)
10361035
.onErrorResume(R2dbcException.class, r2dbcException ->
10371036
Mono.just(createErrorResult(r2dbcException)))
10381037
.doOnNext(OracleResultImpl::addDependent),
1039-
deallocate)
1040-
.doOnCancel(deallocate::subscribe);
1038+
deallocate);
10411039
}
10421040

10431041
/**
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright (c) 2020, 2022, Oracle and/or its affiliates.
3+
4+
This software is dual-licensed to you under the Universal Permissive License
5+
(UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
6+
2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
7+
either license.
8+
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
https://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
*/
21+
22+
package oracle.r2dbc.impl;
23+
24+
import org.reactivestreams.Publisher;
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
28+
/**
29+
* Factory methods that create a {@code Publisher}. These methods cover special
30+
* cases which are not already supported by Project Reactor.
31+
*/
32+
class Publishers {
33+
34+
private Publishers() {}
35+
36+
/**
37+
* A publisher that immediately emits onNext and onComplete to subscribers
38+
*/
39+
private static final Publisher<Object> COMPLETED_PUBLISHER =
40+
Mono.just(new Object());
41+
42+
/**
43+
* <p>
44+
* Returns a publisher that emits the concatenated signals of a
45+
* {@code publisher} and {@code onTerminationPublisher}. If the
46+
* {@code onTerminationPublisher} emits an error, it will suppress any error
47+
* emitted by the first {@code publisher}. If a subscription to the returned
48+
* publisher is cancelled, the {@code onTerminationPublisher} is subscribed to
49+
* but it can not emit any error through the cancelled subscription.
50+
* </p><p>
51+
* The returned publisher behaves similarly to: <pre>{@code
52+
* Flux.concatDelayError(
53+
* publisher,
54+
* onTerminationPublisher)
55+
* .doOnCancel(onTerminationPublisher::subscribe)
56+
* }</pre>
57+
* However, the code above can result in:
58+
* <pre>
59+
* reactor.core.Exceptions$StaticThrowable: Operator has been terminated
60+
* </pre>
61+
* This seems to happen when the concatDelayError publisher receives a cancel
62+
* from a downstream subscriber after it has already received onComplete from
63+
* a upstream publisher.
64+
* </p>
65+
* @param publisher First publisher which is subscribed to.
66+
* @param onTerminationPublisher Publisher which is subscribed to when the
67+
* first publisher terminates, or a subcription is cancelled.
68+
* @return The concatenated publisher.
69+
* @param <T> Type of objects emitted to onNext
70+
*/
71+
static <T> Publisher<T> concatTerminal(
72+
Publisher<T> publisher, Publisher<Void> onTerminationPublisher) {
73+
return Flux.usingWhen(
74+
COMPLETED_PUBLISHER,
75+
ignored -> publisher,
76+
ignored -> onTerminationPublisher);
77+
}
78+
}

src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3222,7 +3222,7 @@ private void verifyConcurrentFetch(Connection connection) {
32223222
// Create many statements and execute them in parallel.
32233223
@SuppressWarnings({"unchecked","rawtypes"})
32243224
Publisher<Long>[] publishers =
3225-
new Publisher[Runtime.getRuntime().availableProcessors() * 4];
3225+
new Publisher[Runtime.getRuntime().availableProcessors() * 2];
32263226

32273227
for (int i = 0; i < publishers.length; i++) {
32283228

0 commit comments

Comments
 (0)