1
1
/*
2
- * Copyright (c) 2017, 2018 Oracle and/or its affiliates. All rights reserved.
2
+ * Copyright (c) 2017, 2019 Oracle and/or its affiliates. All rights reserved.
3
3
*
4
4
* This program and the accompanying materials are made available under the
5
5
* terms of the Eclipse Public License v. 2.0, which is available at
25
25
import java .util .function .Consumer ;
26
26
27
27
import org .glassfish .jersey .internal .LocalizationMessages ;
28
+ import org .glassfish .jersey .internal .jsr166 .SubmissionPublisherFactory ;
29
+ import org .glassfish .jersey .internal .jsr166 .SubmittableFlowPublisher ;
28
30
import org .glassfish .jersey .internal .jsr166 .Flow ;
29
- import org .glassfish .jersey .internal .jsr166 .SubmissionPublisher ;
30
-
31
31
32
32
/**
33
33
* Implementation of {@link Flow.Publisher} corresponding to reactive streams specification.
34
34
* <p>
35
- * Delegates to {@link SubmissionPublisher} repackaged from jsr166.
35
+ * Delegates to {@link SubmittableFlowPublisher} implementation either repackaged from jsr166 for jdk8, or a facade of jdk9
36
+ * {@code SubmissionPublisher}
36
37
*
37
38
* @author Adam Lindenthal (adam.lindenthal at oracle.com)
38
39
*/
39
- public class JerseyPublisher <T > implements Flow . Publisher <T > {
40
+ public class JerseyPublisher <T > implements SubmittableFlowPublisher <T > {
40
41
41
42
private static final int DEFAULT_BUFFER_CAPACITY = 256 ;
42
- private SubmissionPublisher <T > submissionPublisher = new SubmissionPublisher <>();
43
+ private SubmittableFlowPublisher <T > submissionPublisher = SubmissionPublisherFactory .createSubmissionPublisher ();
44
+
43
45
44
46
private final PublisherStrategy strategy ;
45
47
@@ -90,7 +92,7 @@ public JerseyPublisher(final Executor executor) {
90
92
*/
91
93
public JerseyPublisher (final Executor executor , final PublisherStrategy strategy ) {
92
94
this .strategy = strategy ;
93
- submissionPublisher = new SubmissionPublisher <> (executor , DEFAULT_BUFFER_CAPACITY );
95
+ submissionPublisher = SubmissionPublisherFactory . createSubmissionPublisher (executor , DEFAULT_BUFFER_CAPACITY );
94
96
}
95
97
96
98
@@ -128,7 +130,7 @@ public JerseyPublisher(final int maxBufferCapacity) {
128
130
*/
129
131
public JerseyPublisher (final Executor executor , final int maxBufferCapacity , PublisherStrategy strategy ) {
130
132
this .strategy = strategy ;
131
- submissionPublisher = new SubmissionPublisher <> (executor , maxBufferCapacity );
133
+ submissionPublisher = SubmissionPublisherFactory . createSubmissionPublisher (executor , maxBufferCapacity );
132
134
}
133
135
134
136
@ Override
@@ -147,7 +149,7 @@ public void subscribe(final Flow.Subscriber<? super T> subscriber) {
147
149
* @throws NullPointerException if data is null
148
150
* @throws java.util.concurrent.RejectedExecutionException if thrown by Executor
149
151
*/
150
- private int submit (final T data ) {
152
+ public int submit (final T data ) {
151
153
return submissionPublisher .submit (data );
152
154
}
153
155
@@ -203,7 +205,7 @@ public CompletableFuture<Void> consume(final Consumer<? super T> consumer) {
203
205
* @throws NullPointerException if item is null
204
206
* @throws RejectedExecutionException if thrown by Executor
205
207
*/
206
- private int offer (T item , BiPredicate <Flow .Subscriber <? super T >, ? super T > onDrop ) {
208
+ public int offer (T item , BiPredicate <Flow .Subscriber <? super T >, ? super T > onDrop ) {
207
209
return offer (item , 0 , TimeUnit .MILLISECONDS , onDrop );
208
210
}
209
211
@@ -252,7 +254,7 @@ private int offer(T item, BiPredicate<Flow.Subscriber<? super T>, ? super T> onD
252
254
* @throws NullPointerException if item is null
253
255
* @throws RejectedExecutionException if thrown by Executor
254
256
*/
255
- private int offer (T item ,
257
+ public int offer (T item ,
256
258
long timeout ,
257
259
TimeUnit unit ,
258
260
BiPredicate <Flow .Subscriber <? super T >, ? super T > onDrop ) {
0 commit comments