From 3d171d0d905925fa5a649d2d6ea888365ab32937 Mon Sep 17 00:00:00 2001 From: Jan Supol Date: Thu, 21 May 2020 10:07:40 +0200 Subject: [PATCH] Connector to Helidon 2 Web Client Signed-off-by: Jan Supol --- connectors/helidon-connector/pom.xml | 78 +++++ .../helidon/connector/HelidonConnector.java | 249 ++++++++++++++ .../connector/HelidonConnectorProvider.java | 77 +++++ .../helidon/connector/HelidonEntity.java | 170 ++++++++++ .../helidon/connector/HelidonProperties.java | 33 ++ .../helidon/connector/HelidonStructures.java | 165 ++++++++++ .../connector/OutputStreamChannel.java | 199 +++++++++++ .../helidon/connector/package-info.java | 20 ++ .../localization.properties | 21 ++ .../jersey/helidon/connector/AsyncTest.java | 208 ++++++++++++ .../connector/BasicHelidonConnectorTest.java | 308 ++++++++++++++++++ .../connector/FollowRedirectsTest.java | 106 ++++++ .../helidon/connector/LargeDataTest.java | 158 +++++++++ .../helidon/connector/ParallelTest.java | 133 ++++++++ .../jersey/helidon/connector/TimeoutTest.java | 117 +++++++ .../connector/sse/EventOutputTest.java | 274 ++++++++++++++++ .../jersey/helidon/connector/sse/SseTest.java | 182 +++++++++++ connectors/pom.xml | 12 + 18 files changed, 2510 insertions(+) create mode 100644 connectors/helidon-connector/pom.xml create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java create mode 100644 connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java create mode 100644 connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java create mode 100644 connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java diff --git a/connectors/helidon-connector/pom.xml b/connectors/helidon-connector/pom.xml new file mode 100644 index 0000000000..10d1af6d43 --- /dev/null +++ b/connectors/helidon-connector/pom.xml @@ -0,0 +1,78 @@ + + + + + project + org.glassfish.jersey.connectors + 2.31-SNAPSHOT + + 4.0.0 + + jersey-helidon-connector + jar + jersey-connectors-helidon + + + + io.helidon.webclient + helidon-webclient + + 2.0.0-M3 + provided + + + org.glassfish.jersey.test-framework.providers + jersey-test-framework-provider-grizzly2 + ${project.version} + test + + + org.glassfish.jersey.media + jersey-media-sse + ${project.version} + test + + + + + + + com.sun.istack + istack-commons-maven-plugin + true + + + org.codehaus.mojo + build-helper-maven-plugin + true + + + org.apache.maven.plugins + maven-compiler-plugin + false + + 11 + 11 + + + + + \ No newline at end of file diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java new file mode 100644 index 0000000000..6e8e96823d --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnector.java @@ -0,0 +1,249 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import io.helidon.common.Version; +import io.helidon.webclient.WebClient; +import io.helidon.webclient.WebClientRequestBuilder; +import io.helidon.webclient.WebClientResponse; +import org.glassfish.jersey.client.ClientAsyncExecutorLiteral; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientRequest; +import org.glassfish.jersey.client.ClientResponse; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.spi.AsyncConnectorCallback; +import org.glassfish.jersey.client.spi.Connector; +import org.glassfish.jersey.internal.util.PropertiesHelper; +import org.glassfish.jersey.spi.ExecutorServiceProvider; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.core.Configuration; +import javax.ws.rs.core.Response; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.AccessController; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.logging.Logger; + +/** + * A {@link Connector} that utilizes the Helidon HTTP Client to send and receive + * HTTP request and responses. + */ +class HelidonConnector implements Connector { + + private static final String helidonVersion = "Helidon/" + Version.VERSION + " (java " + AccessController + .doPrivileged(PropertiesHelper.getSystemProperty("java.runtime.version")) + ")"; + static final Logger LOGGER = Logger.getLogger(HelidonConnector.class.getName()); + + private final WebClient webClient; + + private final ExecutorServiceKeeper executorServiceKeeper; + private final HelidonEntity.HelidonEntityType entityType; + + private static final InputStream NO_CONTENT_INPUT_STREAM = new InputStream() { + @Override + public int read() throws IOException { + return -1; + } + }; + // internal implementation entity type, can be removed in the future + // settable for testing purposes + private static final String INTERNAL_ENTITY_TYPE = "jersey.config.helidon.client.entity.type"; + + HelidonConnector(final Client client, final Configuration config) { + executorServiceKeeper = new ExecutorServiceKeeper(client); + entityType = getEntityType(config); + + final WebClient.Builder webClientBuilder = WebClient.builder(); + + webClientBuilder.addReader(HelidonStructures.createInputStreamBodyReader()); + HelidonEntity.helidonWriter(entityType).ifPresent(webClientBuilder::addWriter); + + HelidonStructures.createProxy(config).ifPresent(webClientBuilder::proxy); + + HelidonStructures.helidonConfig(config).ifPresent(webClientBuilder::config); + + webClientBuilder.connectTimeout(ClientProperties.getValue(config.getProperties(), + ClientProperties.CONNECT_TIMEOUT, 10000), ChronoUnit.MILLIS); + + HelidonStructures.createSSL(client.getSslContext()).ifPresent(webClientBuilder::ssl); + + webClient = webClientBuilder.build(); + } + + @Override + public ClientResponse apply(ClientRequest request) { + try { + return applyInternal(request).toCompletableFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new ProcessingException(e); + } + } + + @Override + public Future apply(ClientRequest request, AsyncConnectorCallback callback) { + final BiConsumer action = (r, th) -> { + if (th == null) callback.response(r); + else callback.failure(th); + }; + return applyInternal(request) + .whenCompleteAsync(action, executorServiceKeeper.getExecutorService(request)) + .toCompletableFuture(); + } + + @Override + public String getName() { + return helidonVersion; + } + + @Override + public void close() { + + } + + private CompletionStage applyInternal(ClientRequest request) { + final WebClientRequestBuilder webClientRequestBuilder = webClient.method(request.getMethod()); + webClientRequestBuilder.uri(request.getUri()); + + webClientRequestBuilder.headers(HelidonStructures.createHeaders(request.getRequestHeaders())); + + for (String propertyName : request.getConfiguration().getPropertyNames()) { + Object property = request.getConfiguration().getProperty(propertyName); + if (!propertyName.startsWith("jersey") && String.class.isInstance(property)) { + webClientRequestBuilder.property(propertyName, (String) property); + } + } + + for (String propertyName : request.getPropertyNames()) { + Object property = request.resolveProperty(propertyName, null); + if (!propertyName.startsWith("jersey") && String.class.isInstance(property)) { + webClientRequestBuilder.property(propertyName, (String) property); + } + } + + // 2.0.0-M3 + // HelidonStructures.createProxy(request).ifPresent(webClientRequestBuilder::proxy); + + webClientRequestBuilder.followRedirects(request.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true)); + webClientRequestBuilder.readTimeout(request.resolveProperty(ClientProperties.READ_TIMEOUT, 10000), ChronoUnit.MILLIS); + + CompletionStage responseStage = null; + + if (request.hasEntity()) { + responseStage = HelidonEntity.submit( + entityType, request, webClientRequestBuilder, executorServiceKeeper.getExecutorService(request) + ); + } else { + responseStage = webClientRequestBuilder.submit(); + } + + return responseStage.thenCompose((a) -> convertResponse(request, a)); + } + + private CompletionStage convertResponse(final ClientRequest requestContext, + final WebClientResponse webClientResponse) { + + final ClientResponse responseContext = new ClientResponse(new Response.StatusType() { + @Override + public int getStatusCode() { + return webClientResponse.status().code(); + } + + @Override + public Response.Status.Family getFamily() { + return Response.Status.Family.familyOf(getStatusCode()); + } + + @Override + public String getReasonPhrase() { + return webClientResponse.status().reasonPhrase(); + } + }, requestContext); + + for (Map.Entry> entry : webClientResponse.headers().toMap().entrySet()) { + for (String value : entry.getValue()) { + responseContext.getHeaders().add(entry.getKey(), value); + } + } + + responseContext.setResolvedRequestUri(webClientResponse.lastEndpointURI()); + + final CompletionStage stream = HelidonStructures.hasEntity(webClientResponse) + ? webClientResponse.content().as(InputStream.class) + : CompletableFuture.supplyAsync(() -> NO_CONTENT_INPUT_STREAM); + + return stream.thenApply((a) -> { + responseContext.setEntityStream(new FilterInputStream(a) { + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void close() throws IOException { + // Avoid idempotent close in the underlying input stream + if (!closed.compareAndSet(false, true)) { + super.close(); + } + } + }); + return responseContext; + }); + } + + private static HelidonEntity.HelidonEntityType getEntityType(final Configuration config) { + final String helidonType = ClientProperties.getValue(config.getProperties(), + INTERNAL_ENTITY_TYPE, HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL.name()); + final HelidonEntity.HelidonEntityType entityType = HelidonEntity.HelidonEntityType.valueOf(helidonType); + +// if (entityType != HelidonEntity.HelidonEntityType.READABLE_BYTE_CHANNEL) { +// // log warning for internal feature - no localization.properties +// LOGGER.warning(INTERNAL_ENTITY_TYPE + " is " + entityType.name()); +// } + + return entityType; + } + + private static class ExecutorServiceKeeper { + private Optional executorService; + + private ExecutorServiceKeeper(Client client) { + final ClientConfig config = ((JerseyClient) client).getConfiguration(); + executorService = Optional.ofNullable(config.getExecutorService()); + } + + private ExecutorService getExecutorService(ClientRequest request) { + if (!executorService.isPresent()) { + // cache for multiple requests + executorService = Optional.ofNullable(request.getInjectionManager() + .getInstance(ExecutorServiceProvider.class, ClientAsyncExecutorLiteral.INSTANCE).getExecutorService()); + } + + return executorService.get(); + } + } +} diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java new file mode 100644 index 0000000000..36092f4134 --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonConnectorProvider.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.Beta; +import org.glassfish.jersey.client.spi.Connector; +import org.glassfish.jersey.client.spi.ConnectorProvider; +import org.glassfish.jersey.internal.util.JdkVersion; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.core.Configuration; +import java.io.OutputStream; + +/** + * Provider for Helidon WebClient {@link Connector} that utilizes the Helidon HTTP Client to send and receive + * HTTP request and responses. JDK 8 is not supported by the Helidon Connector. + *

+ * The following properties are only supported at construction of this class: + *

    + *
  • {@link org.glassfish.jersey.client.ClientProperties#CONNECT_TIMEOUT}
  • + *
  • {@link org.glassfish.jersey.client.ClientProperties#FOLLOW_REDIRECTS}
  • + *
  • {@link org.glassfish.jersey.client.ClientProperties#PROXY_URI}
  • + *
  • {@link org.glassfish.jersey.client.ClientProperties#PROXY_USERNAME}
  • + *
  • {@link org.glassfish.jersey.client.ClientProperties#PROXY_PASSWORD}
  • + *
  • {@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT}
  • + *
  • {@link HelidonProperties#CONFIG}
  • + *
+ *

+ * If a {@link org.glassfish.jersey.client.ClientResponse} is obtained and an + * entity is not read from the response then + * {@link org.glassfish.jersey.client.ClientResponse#close()} MUST be called + * after processing the response to release connection-based resources. + *

+ *

+ * Client operations are thread safe, the HTTP connection may + * be shared between different threads. + *

+ *

+ * If a response entity is obtained that is an instance of {@link java.io.Closeable} + * then the instance MUST be closed after processing the entity to release + * connection-based resources. + *

+ *

+ * This connector uses {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} to buffer the entity + * written for instance by {@link javax.ws.rs.core.StreamingOutput}. Should the buffer be small and + * {@link javax.ws.rs.core.StreamingOutput#write(OutputStream)} be called many times, the performance can drop. The Content-Length + * or the Content_Encoding header is set by the underlaying Helidon WebClient regardless of the + * {@link org.glassfish.jersey.client.ClientProperties#OUTBOUND_CONTENT_LENGTH_BUFFER} size, however. + *

+ * + * @since 2.31 + */ +@Beta +public class HelidonConnectorProvider implements ConnectorProvider { + @Override + public Connector getConnector(Client client, Configuration runtimeConfig) { + if (JdkVersion.getJdkVersion().getMajor() < 11) { + throw new ProcessingException(LocalizationMessages.NOT_SUPPORTED()); + } + return new HelidonConnector(client, runtimeConfig); + } +} diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java new file mode 100644 index 0000000000..386cb629c3 --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonEntity.java @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import io.helidon.common.GenericType; +import io.helidon.common.http.DataChunk; +import io.helidon.common.http.MediaType; +import io.helidon.common.reactive.Multi; +import io.helidon.common.reactive.OutputStreamPublisher; +import io.helidon.common.reactive.Single; +import io.helidon.media.common.ByteChannelBodyWriter; +import io.helidon.media.common.ContentWriters; +import io.helidon.media.common.MessageBodyContext; +import io.helidon.media.common.MessageBodyWriter; +import io.helidon.media.common.MessageBodyWriterContext; +import io.helidon.webclient.WebClientRequestBuilder; +import io.helidon.webclient.WebClientResponse; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientRequest; + +import javax.ws.rs.ProcessingException; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow; +import java.util.function.Function; + +/** + * A utility class that converts outbound client entity to a class understandable by Helidon. + * Based on the {@link HelidonEntityType} an entity writer is provided to be registered by Helidon client + * and an Entity is provided to be submitted by the Helidon Client. + */ +class HelidonEntity { + /** + * HelidonEnity type chosen by HelidonEntityType + */ + enum HelidonEntityType { + /** + * Simplest structure. Loads all data to the memory. + */ + BYTE_ARRAY_OUTPUT_STREAM, + /** + * Readable ByteChannel that is capable of sending data in chunks. + * Capable of caching of bytes before the data are consumed by Helidon. + */ + READABLE_BYTE_CHANNEL, + /** + * Helidon most native entity. Could be slower than {@link #READABLE_BYTE_CHANNEL}. + */ + OUTPUT_STREAM_PUBLISHER + } + + /** + * Get optional entity writer to be registered by the Helidon Client. For some default providers, + * nothing is needed to be registered. + * @param type the type of the entity class that works best for the Http Client request use case. + * @return possible writer to be registerd by the Helidon Client. + */ + static Optional> helidonWriter(HelidonEntityType type) { + switch (type) { + case BYTE_ARRAY_OUTPUT_STREAM: + return Optional.of(new OutputStreamBodyWriter()); + case OUTPUT_STREAM_PUBLISHER: + //Helidon default + return Optional.empty(); + case READABLE_BYTE_CHANNEL: + return Optional.of(ByteChannelBodyWriter.create()); + } + return Optional.empty(); + } + + /** + * Convert Jersey {@code OutputStream} to an entity based on the client request use case and submits to the provided + * {@code WebClientRequestBuilder}. + * @param type the type of the Helidon entity. + * @param requestContext Jersey {@link ClientRequest} providing the entity {@code OutputStream}. + * @param requestBuilder Helidon {@code WebClientRequestBuilder} which is used to submit the entity + * @param executorService {@link ExecutorService} that fills the entity instance for Helidon with data from Jersey + * {@code OutputStream}. + * @return Helidon Client response completion stage. + */ + static CompletionStage submit(HelidonEntityType type, + ClientRequest requestContext, + WebClientRequestBuilder requestBuilder, + ExecutorService executorService) { + CompletionStage stage = null; + if (type != null) { + final int bufferSize = requestContext.resolveProperty( + ClientProperties.OUTBOUND_CONTENT_LENGTH_BUFFER, 8192); + switch (type) { + case BYTE_ARRAY_OUTPUT_STREAM: + final ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize); + requestContext.setStreamProvider(contentLength -> baos); + ((ProcessingRunnable) () -> requestContext.writeEntity()).run(); + stage = requestBuilder.submit(baos); + break; + case READABLE_BYTE_CHANNEL: + final OutputStreamChannel channel = new OutputStreamChannel(bufferSize); + requestContext.setStreamProvider(contentLength -> channel); + executorService.execute((ProcessingRunnable) () -> requestContext.writeEntity()); + stage = requestBuilder.submit(channel); + break; + case OUTPUT_STREAM_PUBLISHER: + final OutputStreamPublisher publisher = new OutputStreamPublisher(); + requestContext.setStreamProvider(contentLength -> publisher); + executorService.execute((ProcessingRunnable) () -> { + requestContext.writeEntity(); + publisher.close(); + }); + stage = requestBuilder.submit(Multi.from(publisher).map(DataChunk::create)); + break; + } + } + return stage; + } + + @FunctionalInterface + private interface ProcessingRunnable extends Runnable { + void runOrThrow() throws IOException; + + @Override + default void run() { + try { + runOrThrow(); + } catch (IOException e) { + throw new ProcessingException(LocalizationMessages.ERROR_WRITING_ENTITY(e.getMessage()), e); + } + } + } + + @SuppressWarnings("unchecked") + private static class OutputStreamBodyWriter implements MessageBodyWriter { + private OutputStreamBodyWriter() { + } + + @Override + public Flow.Publisher write(Single content, GenericType type, MessageBodyWriterContext context) { + context.contentType(MediaType.APPLICATION_OCTET_STREAM); + return content.flatMap(new ByteArrayOutputStreamToChunks()); + } + + @Override + public boolean accept(GenericType type, MessageBodyContext context) { + return ByteArrayOutputStream.class.isAssignableFrom(type.rawType()); + } + + private static class ByteArrayOutputStreamToChunks implements Function> { + @Override + public Flow.Publisher apply(ByteArrayOutputStream byteArrayOutputStream) { + return ContentWriters.writeBytes(byteArrayOutputStream.toByteArray(), false); + } + } + } +} diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java new file mode 100644 index 0000000000..62d0dbd3fe --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonProperties.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.internal.util.PropertiesClass; + +import io.helidon.config.Config; +import io.helidon.webclient.WebClient; + +/** + * Configuration options specific to the Client API that utilizes {@link HelidonConnector} + */ +@PropertiesClass +public final class HelidonProperties { + + /** + * A Helidon {@link Config} instance that is passed to {@link WebClient.Builder#config(Config)} if available + */ + public static final String CONFIG = "jersey.config.helidon.client.config"; +} diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java new file mode 100644 index 0000000000..fbaffec66c --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/HelidonStructures.java @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import io.helidon.common.http.Headers; +import io.helidon.common.http.Http; +import io.helidon.common.http.ReadOnlyParameters; +import io.helidon.config.Config; +import io.helidon.config.ConfigSources; +import io.helidon.media.common.InputStreamBodyReader; +import io.helidon.media.common.MessageBodyReader; +import io.helidon.webclient.Proxy; +import io.helidon.webclient.Ssl; +import io.helidon.webclient.WebClientResponse; +import io.netty.handler.codec.http.HttpHeaderValues; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientRequest; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.ProcessingException; +import javax.ws.rs.core.Configuration; +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; + +/** + * Helidon specific classes and implementations. + */ +class HelidonStructures { + + static Headers createHeaders(Map> data) { + return new ReadOnlyHeaders(data); + } + + static MessageBodyReader createInputStreamBodyReader() { + return InputStreamBodyReader.create(); + } + + static Optional helidonConfig(Configuration configuration) { + final Object helidonConfig = configuration.getProperty(HelidonProperties.CONFIG); + if (helidonConfig != null) { + if (!Config.class.isInstance(helidonConfig)) { + HelidonConnector.LOGGER.warning(LocalizationMessages.NOT_HELIDON_CONFIG(helidonConfig.getClass().getName())); + return Optional.empty(); + } else { + return Optional.of((Config) helidonConfig); + } + } + return Optional.empty(); + } + + static Optional createProxy(Configuration config) { + return ProxyBuilder.createProxy(config); + } + + static Optional createProxy(ClientRequest request) { + return ProxyBuilder.createProxy(request); + } + + static Optional createSSL(SSLContext context) { + return context == null ? Optional.empty() : Optional.of(Ssl.builder().sslContext(context).build()); + } + + static boolean hasEntity(WebClientResponse webClientResponse) { + final ReadOnlyParameters headers = webClientResponse.content().readerContext().headers(); + final Optional contentLenth = headers.first(Http.Header.CONTENT_LENGTH); + final Optional encoding = headers.first(Http.Header.TRANSFER_ENCODING); + + return ((contentLenth.isPresent() && !contentLenth.get().equals("0")) + || (encoding.isPresent() && encoding.get().equals(HttpHeaderValues.CHUNKED.toString()))); + } + + private static class ReadOnlyHeaders extends ReadOnlyParameters implements Headers { + public ReadOnlyHeaders(Map> data) { + super(data); + } + } + + private static class ProxyBuilder { + private static Optional createProxy(Configuration config) { + final Object proxyUri = config.getProperty(ClientProperties.PROXY_URI); + final String userName + = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_USERNAME, String.class); + final String password + = ClientProperties.getValue(config.getProperties(), ClientProperties.PROXY_PASSWORD, String.class); + return createProxy(proxyUri, userName, password); + } + + private static Optional createProxy(ClientRequest clientRequest) { + final Object proxyUri = clientRequest.resolveProperty(ClientProperties.PROXY_URI, Object.class); + final String userName = clientRequest.resolveProperty(ClientProperties.PROXY_USERNAME, String.class); + final String password = clientRequest.resolveProperty(ClientProperties.PROXY_PASSWORD, String.class); + return createProxy(proxyUri, userName, password); + } + + private static Optional createProxy(Object proxyUri, String userName, String password) { + if (proxyUri != null) { + final URI u = getProxyUri(proxyUri); + final Proxy.Builder builder = Proxy.builder(); + Map proxyMap; + if (u.getScheme().toUpperCase(Locale.ROOT).equals("DIRECT")) { + proxyMap = Map.of("type", "NONE"); + //builder.type(Proxy.ProxyType.NONE); + } else { + builder.host(u.getHost()).port(u.getPort()); + switch (u.getScheme().toUpperCase(Locale.ROOT)) { + case "HTTP": + proxyMap = Map.of("type", "HTTP"); + //builder.type(Proxy.ProxyType.HTTP); + break; + case "SOCKS": + proxyMap = Map.of("type", "SOCKS_4"); + //builder.type(Proxy.ProxyType.SOCKS_4); + break; + case "SOCKS5": + proxyMap = Map.of("type", "SOCKS_5"); + //builder.type(Proxy.ProxyType.SOCKS_5); + break; + default: + HelidonConnector.LOGGER.warning(LocalizationMessages.UNSUPPORTED_PROXY_SCHEMA(u.getScheme())); + return Optional.empty(); + } + builder.config(Config.create(ConfigSources.create(proxyMap))); + } + if (userName != null) { + builder.username(userName); + + if (password != null) { + builder.password(password.toCharArray()); + } + } + return Optional.of(builder.build()); + } else { + return Optional.empty(); + } + } + + private static URI getProxyUri(final Object proxy) { + if (proxy instanceof URI) { + return (URI) proxy; + } else if (proxy instanceof String) { + return URI.create((String) proxy); + } else { + throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI)); + } + } + } +} diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java new file mode 100644 index 0000000000..152ac7f7b2 --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/OutputStreamChannel.java @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +class OutputStreamChannel extends OutputStream implements ReadableByteChannel { + + private ReentrantLock lock = new ReentrantLock(); + private static final ByteBuffer VOID = ByteBuffer.allocate(0); + private static final int CAPACITY = Integer.getInteger("jersey.helidon.connector.osc.capacity", 8); + private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.helidon.connector.osc.read.timeout", 10000); + private static final int READ_TIMEOUT = Integer.getInteger("jersey.helidon.connector.osc.write.timeout", 10000); + private final int bufferSize; + + OutputStreamChannel(int bufferSize) { + this.bufferSize = bufferSize; + } + + private final LinkedBlockingDeque queue = new LinkedBlockingDeque<>(CAPACITY); + + private volatile boolean open = true; + private ByteBuffer remainingByteBuffer; + + @Override + public int read(ByteBuffer dst) throws IOException { + if (!open) { + throw new ClosedChannelException(); + } + + int sum = 0; + + do { + ByteBuffer top; + try { + top = poll(READ_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + open = false; + throw new ClosedByInterruptException(); + } + + if (top == null) { + return sum; + } + + if (top == VOID) { + if (sum == 0) { + open = false; + return -1; + } else { + queue.addFirst(top); + return sum; + } + } + + final int topSize = top.remaining(); + final int dstAvailable = dst.remaining(); + final int minSize = Math.min(topSize, dstAvailable); + + if (top.hasArray()) { + dst.put(top.array(), top.arrayOffset() + top.position(), minSize); + top.position(top.position() + minSize); + } else { + while (dst.hasRemaining() && top.hasRemaining()) { + dst.put(top.get()); + } + } + + sum += minSize; + + if (top.hasRemaining()) { + remainingByteBuffer = top; + } + } while (dst.hasRemaining()); + + return sum; + } + + private ByteBuffer poll(long timeout, TimeUnit unit) throws InterruptedException { + if (remainingByteBuffer != null) { + final ByteBuffer remaining = remainingByteBuffer; + remainingByteBuffer = null; + return remaining; + } else { + // do not modify head + lock.lock(); + final ByteBuffer peek = queue.poll(timeout, unit); + // can modify head + lock.unlock(); + return peek; + } + } + + @Override + public void write(int b) throws IOException { + write(new byte[]{(byte) b}, 0, 1); + } + + @Override + public void write(byte[] b) throws IOException { + super.write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkClosed(); + + if (lock.tryLock()) { + if (len < bufferSize && queue.size() > 0) { + final ByteBuffer buffer = queue.getLast(); + if (buffer != null && (buffer.capacity() - buffer.limit()) > len) { + //set for write + buffer.position(buffer.limit()); + buffer.limit(buffer.capacity()); + buffer.put(b, off, len); + //set for read + buffer.flip(); + lock.unlock(); + return; + } + } + lock.unlock(); + } + + final int maxLen = Math.max(len, bufferSize); + final byte[] bytes = new byte[maxLen]; + System.arraycopy(b, off, bytes, 0, len); + + final ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.limit(len); + buffer.position(0); + + write(buffer); + } + + private void write(ByteBuffer buffer) throws IOException { + try { + boolean queued = queue.offer(buffer, WRITE_TIMEOUT, TimeUnit.MILLISECONDS); + if (!queued) { + throw new IOException("Buffer overflow."); + } + + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public void close() throws IOException { + boolean offer = false; + + try { + offer = queue.offer(VOID, WRITE_TIMEOUT, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore. + } + + if (!offer) { + lock.lock(); + queue.removeLast(); + queue.add(VOID); + lock.unlock(); + } + } + + + @Override + public boolean isOpen() { + return open; + } + + private void checkClosed() throws IOException { + if (!open) { + throw new IOException("Stream already closed."); + } + } +} diff --git a/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java new file mode 100644 index 0000000000..9dc13efeb3 --- /dev/null +++ b/connectors/helidon-connector/src/main/java/org/glassfish/jersey/helidon/connector/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +/** + * Jersey client {@link org.glassfish.jersey.client.spi.Connector connector} based on Helidon Web Client. + */ +package org.glassfish.jersey.helidon.connector; diff --git a/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties b/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties new file mode 100644 index 0000000000..f962cbcc2b --- /dev/null +++ b/connectors/helidon-connector/src/main/resources/org.glassfish.jersey.helidon.connector/localization.properties @@ -0,0 +1,21 @@ +# +# Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0, which is available at +# http://www.eclipse.org/legal/epl-2.0. +# +# This Source Code may also be made available under the following Secondary +# Licenses when the conditions for such availability set forth in the +# Eclipse Public License v. 2.0 are satisfied: GNU General Public License, +# version 2 with the GNU Classpath Exception, which is available at +# https://www.gnu.org/software/classpath/license.html. +# +# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 +# + +error.writing.entity=Error writing entity: {0}. +not.helidon.config=Given instance {0} is not Helidon config. Provided HelidonProperties.CONFIG is ignored. +not.supported=Helidon connector is not supported on JDK version < 11. +unsupported.proxy.schema=Proxy schema "{0}" not supported. +wrong.proxy.uri.type=The proxy URI ("{0}") property MUST be an instance of String or URI. \ No newline at end of file diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java new file mode 100644 index 0000000000..b6189db57a --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/AsyncTest.java @@ -0,0 +1,208 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.hamcrest.Matchers; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.client.Entity; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.container.TimeoutHandler; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Asynchronous connector test. + * + * @author Arul Dhesiaseelan (aruld at acm.org) + * @author Marek Potociar + */ +public class AsyncTest extends JerseyTest { + + private static final Logger LOGGER = Logger.getLogger(AsyncTest.class.getName()); + private static final String PATH = "async"; + + /** + * Asynchronous test resource. + */ + @Path(PATH) + public static class AsyncResource { + + /** + * Typical long-running operation duration. + */ + public static final long OPERATION_DURATION = 1000; + + /** + * Long-running asynchronous post. + * + * @param asyncResponse async response. + * @param id post request id (received as request payload). + */ + @POST + public void asyncPost(@Suspended final AsyncResponse asyncResponse, final String id) { + LOGGER.info("Long running post operation called with id " + id + " on thread " + Thread.currentThread().getName()); + new Thread(new Runnable() { + + @Override + public void run() { + final String result = veryExpensiveOperation(); + asyncResponse.resume(result); + } + + private String veryExpensiveOperation() { + // ... very expensive operation that typically finishes within 1 seconds, simulated using sleep() + try { + Thread.sleep(OPERATION_DURATION); + return "DONE-" + id; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return "INTERRUPTED-" + id; + } finally { + LOGGER.info("Long running post operation finished on thread " + Thread.currentThread().getName()); + } + } + }, "async-post-runner-" + id).start(); + } + + /** + * Long-running async get request that times out. + * + * @param asyncResponse async response. + */ + @GET + @Path("timeout") + public void asyncGetWithTimeout(@Suspended final AsyncResponse asyncResponse) { + LOGGER.info("Async long-running get with timeout called on thread " + Thread.currentThread().getName()); + asyncResponse.setTimeoutHandler(new TimeoutHandler() { + + @Override + public void handleTimeout(final AsyncResponse asyncResponse) { + asyncResponse.resume(Response.status(Response.Status.SERVICE_UNAVAILABLE) + .entity("Operation time out.").build()); + } + }); + asyncResponse.setTimeout(1, TimeUnit.SECONDS); + + new Thread(new Runnable() { + + @Override + public void run() { + final String result = veryExpensiveOperation(); + asyncResponse.resume(result); + } + + private String veryExpensiveOperation() { + // very expensive operation that typically finishes within 1 second but can take up to 5 seconds, + // simulated using sleep() + try { + Thread.sleep(5 * OPERATION_DURATION); + return "DONE"; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return "INTERRUPTED"; + } finally { + LOGGER.info("Async long-running get with timeout finished on thread " + Thread.currentThread().getName()); + } + } + }).start(); + } + + } + + @Override + protected Application configure() { + return new ResourceConfig(AsyncResource.class) + .register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY)); + } + + @Override + protected void configureClient(final ClientConfig config) { + config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY)); + config.connectorProvider(new HelidonConnectorProvider()); + } + + /** + * Test asynchronous POST. + *

+ * Send 3 async POST requests and wait to receive the responses. Check the response content and + * assert that the operation did not take more than twice as long as a single long operation duration + * (this ensures async request execution). + * + * @throws Exception in case of a test error. + */ + @Test + public void testAsyncPost() throws Exception { + final Future warmUp1 = target(PATH).request().async().post(Entity.text("100")); + final Future warmUp2 = target(PATH).request().async().post(Entity.text("200")); + final Future warmUp3 = target(PATH).request().async().post(Entity.text("300")); + + assertEquals("DONE-100", warmUp1.get().readEntity(String.class)); + assertEquals("DONE-200", warmUp2.get().readEntity(String.class)); + assertEquals("DONE-300", warmUp3.get().readEntity(String.class)); + + final long tic = System.currentTimeMillis(); + + // Submit requests asynchronously. + final Future rf1 = target(PATH).request().async().post(Entity.text("1")); + final Future rf2 = target(PATH).request().async().post(Entity.text("2")); + final Future rf3 = target(PATH).request().async().post(Entity.text("3")); + + // get() waits for the response + final String r1 = rf1.get().readEntity(String.class); + final String r2 = rf2.get().readEntity(String.class); + final String r3 = rf3.get().readEntity(String.class); + + final long toc = System.currentTimeMillis(); + + assertEquals("DONE-1", r1); + assertEquals("DONE-2", r2); + assertEquals("DONE-3", r3); + + assertThat("Async processing took too long.", toc - tic, Matchers.lessThan(3 * AsyncResource.OPERATION_DURATION)); + } + + /** + * Test accessing an operation that times out on the server. + * + * @throws Exception in case of a test error. + */ + @Test + public void testAsyncGetWithTimeout() throws Exception { + final Future responseFuture = target(PATH).path("timeout").request().async().get(); + // Request is being processed asynchronously. + final Response response = responseFuture.get(); + + // get() waits for the response + assertEquals(503, response.getStatus()); + assertEquals("Operation time out.", response.readEntity(String.class)); + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java new file mode 100644 index 0000000000..8b42df173f --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/BasicHelidonConnectorTest.java @@ -0,0 +1,308 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.JerseyCompletionStageRxInvoker; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +@RunWith(Parameterized.class) +public class BasicHelidonConnectorTest extends JerseyTest { + + private final String entityType; + + public BasicHelidonConnectorTest(String entityType) { + this.entityType = entityType; + } + + @Parameterized.Parameters + public static Object[] data() { + return new Object[]{"BYTE_ARRAY_OUTPUT_STREAM", "READABLE_BYTE_CHANNEL", "OUTPUT_STREAM_PUBLISHER"}; + } + + @Path("basic") + public static class BasicResource { + @Path("get") + @GET + public String get() { + return "ok"; + } + + @Path("getquery") + @GET + public String getQuery(@QueryParam("first") String first, @QueryParam("second") String second) { + return first + second; + } + + @POST + @Path("post") + public String post(String entity) { + return entity + entity; + } + + @GET + @Path("headers") + public Response headers(@Context HttpHeaders headers) { + Response.ResponseBuilder response = Response.ok("ok"); + for (Map.Entry> set : headers.getRequestHeaders().entrySet()) { + if (set.getKey().toUpperCase(Locale.ROOT).startsWith("X-TEST")) { + response.header(set.getKey(), set.getValue().iterator().next()); + } + } + return response.build(); + } + + @PUT + @Consumes("test/x-test") + @Produces("test/y-test") + @Path("produces/consumes") + public String putConsumesProduces(String content) { + return content + content; + } + } + + @Path("async") + public static class AsyncResource { + private static CountDownLatch shortLong = null; + + @GET + @Path("reset") + public void reset() { + shortLong = new CountDownLatch(1); + } + + @Path("long") + @GET + public String longGet() throws InterruptedException { + shortLong.await(10000, TimeUnit.MILLISECONDS); + return shortLong.getCount() == 0 ? "long" : "shortLong CountDownLatch has not been hit"; + } + + @Path("short") + @GET + public String shortGet() { + shortLong.countDown(); + return "short"; + } + } + + @Override + protected Application configure() { + return new ResourceConfig(BasicResource.class, AsyncResource.class) + .property(LoggingFeature.LOGGING_FEATURE_LOGGER_LEVEL_SERVER, "WARNING"); + } + + @Override + protected void configureClient(ClientConfig config) { + super.configureClient(config); + config.connectorProvider(new HelidonConnectorProvider()); + config.property("jersey.config.helidon.client.entity.type", entityType); + } + + @Test + public void testBasicGet() { + try (Response response = target("basic").path("get").request().get()) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("ok", response.readEntity(String.class)); + } + } + + @Test + public void testBasicPost() { + try (Response response = target("basic").path("post").request() + .buildPost(Entity.entity("ok", MediaType.TEXT_PLAIN_TYPE)).invoke()) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("okok", response.readEntity(String.class)); + } + } + + @Test + public void queryGetTest() { + try (Response response = target("basic").path("getquery") + .queryParam("first", "hello") + .queryParam("second", "world") + .request().get()) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("helloworld", response.readEntity(String.class)); + } + } + + @Test + public void testHeaders() { + String[][] headers = new String[][]{{"X-TEST-ONE", "ONE"}, {"X-TEST-TWO", "TWO"}, {"X-TEST-THREE", "THREE"}}; + MultivaluedHashMap map = new MultivaluedHashMap<>(); + Arrays.stream(headers).forEach(a -> map.add(a[0], a[1])); + try (Response response = target("basic").path("headers").request().headers(map).get()) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("ok", response.readEntity(String.class)); + for (int i = 0; i != headers.length; i++) { + Assert.assertTrue(response.getHeaders().containsKey(headers[i][0])); + Assert.assertEquals(headers[i][1], response.getStringHeaders().getFirst(headers[i][0])); + } + } + } + + @Test + public void testProduces() { + try (Response response = target("basic").path("produces/consumes").request("test/z-test") + .put(Entity.entity("ok", new MediaType("test", "x-test")))) { + Assert.assertEquals(406, response.getStatus()); + } + + try (Response response = target("basic").path("produces/consumes").request() + .put(Entity.entity("ok", new MediaType("test", "x-test")))) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("okok", response.readEntity(String.class)); + Assert.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE)); + } + } + + @Test + public void testAsyncGet() throws ExecutionException, InterruptedException { + Future futureResponse = target("basic").path("get").request().async().get(); + try (Response response = futureResponse.get()) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("ok", response.readEntity(String.class)); + } + } + + @Test + public void testConsumes() { + try (Response response = target("basic").path("produces/consumes").request("test/y-test") + .put(Entity.entity("ok", new MediaType("test", "z-test")))) { + Assert.assertEquals(415, response.getStatus()); + } + + try (Response response = target("basic").path("produces/consumes").request("test/y-test") + .put(Entity.entity("ok", MediaType.WILDCARD_TYPE))) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("okok", response.readEntity(String.class)); + Assert.assertEquals("test/y-test", response.getStringHeaders().getFirst(HttpHeaders.CONTENT_TYPE)); + } + } + + @Test + public void testRxGet() throws ExecutionException, InterruptedException { + CompletableFuture futureResponse = + target("basic").path("get").request().rx(JerseyCompletionStageRxInvoker.class).get(); + + try (Response response = futureResponse.get()) { + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals("ok", response.readEntity(String.class)); + } + } + + @Test + public void testInputStreamEntity() throws IOException { + try (Response response = target("basic").path("get").request().get()) { + Assert.assertEquals(200, response.getStatus()); + InputStream is = response.readEntity(InputStream.class); + Assert.assertEquals('o', is.read()); + Assert.assertEquals('k', is.read()); + is.close(); + } + } + + // -----------Async + + @Test + public void testTwoClientsAsync() throws ExecutionException, InterruptedException { + try (Response resetResponse = target("async").path("reset").request().get()) { + Assert.assertEquals(204, resetResponse.getStatus()); + } + + ClientConfig config = new ClientConfig(); + config.connectorProvider(new HelidonConnectorProvider()); + + Client longClient = ClientBuilder.newClient(config); + Invocation.Builder longRequest = longClient.target(getBaseUri()).path("async/long").request(); + + Client shortClient = ClientBuilder.newClient(config); + Invocation.Builder shortRequest = shortClient.target(getBaseUri()).path("async/short").request(); + + Future futureLongResponse = longRequest.async().get(); + Future futureShortResponse = shortRequest.async().get(); + + try (Response shortResponse = futureShortResponse.get()) { + Assert.assertEquals(200, shortResponse.getStatus()); + Assert.assertEquals("short", shortResponse.readEntity(String.class)); + } + + try (Response longResponse = futureLongResponse.get()) { + Assert.assertEquals(200, longResponse.getStatus()); + Assert.assertEquals("long", longResponse.readEntity(String.class)); + } + } + + @Test + public void testOneClientsTwoReqestsAsync() throws ExecutionException, InterruptedException { + try (Response resetResponse = target("async").path("reset").request().get()) { + Assert.assertEquals(204, resetResponse.getStatus()); + } + + Invocation.Builder longRequest = target().path("async/long").request(); + Invocation.Builder shortRequest = target().path("async/short").request(); + + Future futureLongResponse = longRequest.async().get(); + Future futureShortResponse = shortRequest.async().get(); + + try (Response shortResponse = futureShortResponse.get()) { + Assert.assertEquals(200, shortResponse.getStatus()); + Assert.assertEquals("short", shortResponse.readEntity(String.class)); + } + + try (Response longResponse = futureLongResponse.get()) { + Assert.assertEquals(200, longResponse.getStatus()); + Assert.assertEquals("long", longResponse.readEntity(String.class)); + } + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java new file mode 100644 index 0000000000..87105d666a --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/FollowRedirectsTest.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientResponse; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.client.ClientRequestContext; +import javax.ws.rs.client.ClientResponseContext; +import javax.ws.rs.client.ClientResponseFilter; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; + +/** + * Helidon connector follow redirect tests. + * + * @author Martin Matula + * @author Arul Dhesiaseelan (aruld at acm.org) + * @author Marek Potociar + */ +public class FollowRedirectsTest extends JerseyTest { + private static final Logger LOGGER = Logger.getLogger(TimeoutTest.class.getName()); + + @Path("/test") + public static class RedirectResource { + @GET + public String get() { + return "GET"; + } + + @GET + @Path("redirect") + public Response redirect() { + return Response.seeOther(UriBuilder.fromResource(RedirectResource.class).build()).build(); + } + } + + @Override + protected Application configure() { + ResourceConfig config = new ResourceConfig(RedirectResource.class); + config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.PAYLOAD_ANY)); + return config; + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new HelidonConnectorProvider()); + } + + private static class RedirectTestFilter implements ClientResponseFilter { + public static final String RESOLVED_URI_HEADER = "resolved-uri"; + + @Override + public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException { + if (responseContext instanceof ClientResponse) { + ClientResponse clientResponse = (ClientResponse) responseContext; + responseContext.getHeaders().putSingle(RESOLVED_URI_HEADER, clientResponse.getResolvedRequestUri().toString()); + } + } + } + + @Test + public void testDoFollow() { + Response r = target("test/redirect").register(RedirectTestFilter.class).request().get(); + assertEquals(200, r.getStatus()); + assertEquals("GET", r.readEntity(String.class)); + + assertEquals( + UriBuilder.fromUri(getBaseUri()).path(RedirectResource.class).build().toString(), + r.getHeaderString(RedirectTestFilter.RESOLVED_URI_HEADER)); + } + + @Test + public void testDontFollow() { + WebTarget t = target("test/redirect"); + t.property(ClientProperties.FOLLOW_REDIRECTS, false); + assertEquals(303, t.request().get().getStatus()); + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java new file mode 100644 index 0000000000..8a7a6907df --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/LargeDataTest.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.logging.LoggingFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.ServerErrorException; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.logging.Logger; + +/** + * The LargeDataTest reproduces a problem when bytes of large data sent are incorrectly sent. + * As a result, the request body is different than what was sent by the client. + *

+ * In order to be able to inspect the request body, the generated data is a sequence of numbers + * delimited with new lines. Such as + *


+ *     1
+ *     2
+ *     3
+ *
+ *     ...
+ *
+ *     57234
+ *     57235
+ *     57236
+ *
+ *     ...
+ * 
+ * It is also possible to send the data to netcat: {@code nc -l 8080} and verify the problem is + * on the client side. + * + * @author Stepan Vavra + * @author Marek Potociar + */ +public class LargeDataTest extends JerseyTest { + + private static final Logger LOGGER = Logger.getLogger(LargeDataTest.class.getName()); + private static final int LONG_DATA_SIZE = 100_000; // for large set around 5GB, try e.g.: 536_870_912; + private static volatile Throwable exception; + + private static StreamingOutput longData(long sequence) { + return out -> { + long offset = 0; + while (offset < sequence) { + out.write(Long.toString(offset).getBytes()); + out.write('\n'); + offset++; + } + }; + } + + @Override + protected Application configure() { + ResourceConfig config = new ResourceConfig(HttpMethodResource.class); + config.register(new LoggingFeature(LOGGER, LoggingFeature.Verbosity.HEADERS_ONLY)); + return config; + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new HelidonConnectorProvider()); + } + + @Test + public void postWithLargeData() throws Throwable { + long milis = System.currentTimeMillis(); + WebTarget webTarget = target("test"); + + Response response = webTarget.request().post(Entity.entity(longData(LONG_DATA_SIZE), MediaType.TEXT_PLAIN_TYPE)); + + try { + if (exception != null) { + + // the reason to throw the exception is that IntelliJ gives you an option to compare the expected with the actual + throw exception; + } + + Assert.assertEquals("Unexpected error: " + response.getStatus(), + Status.Family.SUCCESSFUL, + response.getStatusInfo().getFamily()); + } finally { + response.close(); + } + if (LONG_DATA_SIZE > 9_999) { + System.out.println("Large Data Test took " + (System.currentTimeMillis() - milis) + "milis"); + } + } + + @Path("/test") + public static class HttpMethodResource { + + @POST + public Response post(InputStream content) { + try { + longData(LONG_DATA_SIZE).write(new OutputStream() { + + private long position = 0; +// private long mbRead = 0; + + @Override + public void write(final int generated) throws IOException { + int received = content.read(); + + if (received != generated) { + throw new IOException("Bytes don't match at position " + position + + ": received=" + received + + ", generated=" + generated); + } + +// position++; +// System.out.println("position" + position); +// if (position % (1024 * 1024) == 0) { +// mbRead++; +// System.out.println("MB read: " + mbRead); +// } + } + }); + } catch (IOException e) { + exception = e; + throw new ServerErrorException(e.getMessage(), 500, e); + } + + return Response.ok().build(); + } + + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java new file mode 100644 index 0000000000..94a1d26a41 --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/ParallelTest.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests the parallel execution of multiple requests. + * + * @author Stepan Kopriva + */ +public class ParallelTest extends JerseyTest { + private static final Logger LOGGER = Logger.getLogger(ParallelTest.class.getName()); + + private static final int PARALLEL_CLIENTS = 10; + private static final String PATH = "test"; + private static final AtomicInteger receivedCounter = new AtomicInteger(0); + private static final AtomicInteger resourceCounter = new AtomicInteger(0); + private static final CyclicBarrier startBarrier = new CyclicBarrier(PARALLEL_CLIENTS + 1); + private static final CountDownLatch doneLatch = new CountDownLatch(PARALLEL_CLIENTS); + + @Path(PATH) + public static class MyResource { + + @GET + public String get() { + sleep(); + resourceCounter.addAndGet(1); + return "GET"; + } + + private void sleep() { + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + Logger.getLogger(ParallelTest.class.getName()).log(Level.SEVERE, null, ex); + } + } + } + + @Override + protected Application configure() { + return new ResourceConfig(ParallelTest.MyResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new HelidonConnectorProvider()); + } + + @Test + public void testParallel() throws BrokenBarrierException, InterruptedException, TimeoutException { + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(PARALLEL_CLIENTS); + + try { + final WebTarget target = target(); + for (int i = 1; i <= PARALLEL_CLIENTS; i++) { + final int id = i; + executor.submit(new Runnable() { + @Override + public void run() { + try { + startBarrier.await(); + Response response; + response = target.path(PATH).request().get(); + assertEquals("GET", response.readEntity(String.class)); + receivedCounter.incrementAndGet(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + LOGGER.log(Level.WARNING, "Client thread " + id + " interrupted.", ex); + } catch (BrokenBarrierException ex) { + LOGGER.log(Level.INFO, "Client thread " + id + " failed on broken barrier.", ex); + } catch (Throwable t) { + t.printStackTrace(); + LOGGER.log(Level.WARNING, "Client thread " + id + " failed on unexpected exception.", t); + } finally { + doneLatch.countDown(); + } + } + }); + } + + startBarrier.await(1, TimeUnit.SECONDS); + + assertTrue("Waiting for clients to finish has timed out.", doneLatch.await(5 * getAsyncTimeoutMultiplier(), + TimeUnit.SECONDS)); + + assertEquals("Resource counter", PARALLEL_CLIENTS, resourceCounter.get()); + + assertEquals("Received counter", PARALLEL_CLIENTS, receivedCounter.get()); + } finally { + executor.shutdownNow(); + Assert.assertTrue("Executor termination", executor.awaitTermination(5, TimeUnit.SECONDS)); + } + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java new file mode 100644 index 0000000000..1fe6374177 --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/TimeoutTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Ignore; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.ProcessingException; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author Martin Matula + */ +public class TimeoutTest extends JerseyTest { + @Path("/test") + public static class TimeoutResource { + @GET + public String get() { + return "GET"; + } + + @GET + @Path("timeout") + public String getTimeout() { + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "GET"; + } + } + + @Override + protected Application configure() { + return new ResourceConfig(TimeoutResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new HelidonConnectorProvider()); + } + + @Test + public void testFast() { + Response r = target("test").request().get(); + assertEquals(200, r.getStatus()); + assertEquals("GET", r.readEntity(String.class)); + } + + @Test + public void testSlow() { + try { + target("test/timeout").property(ClientProperties.READ_TIMEOUT, 1_000).request().get(); + fail("Timeout expected."); + } catch (ProcessingException e) { + assertTimeoutException(e); + } + } + + @Ignore + // TODO - WebClient change request + public void testTimeoutInRequest() { + try { + target("test/timeout").request().property(ClientProperties.READ_TIMEOUT, 1_000).get(); + fail("Timeout expected."); + } catch (ProcessingException e) { + assertTimeoutException(e); + } + } + + private void assertTimeoutException(Exception e) { + String exceptionName = "TimeoutException"; // check netty or JDK TimeoutException + Throwable t = e.getCause(); + while (t != null) { + if (t.getClass().getSimpleName().contains(exceptionName)) { + break; + } + t = t.getCause(); + } + if (t == null) { + if (e.getCause() != null) { + if (e.getCause().getCause() != null) { + fail("Unexpected processing exception cause" + e.getCause().getCause().getMessage()); + } else { + fail("Unexpected processing exception cause" + e.getCause().getMessage()); + } + } else { + fail("Unexpected processing exception cause" + e.getMessage()); + } + } + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java new file mode 100644 index 0000000000..31ceb61e8d --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/EventOutputTest.java @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector.sse; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.helidon.connector.HelidonConnectorProvider; +import org.glassfish.jersey.media.sse.EventInput; +import org.glassfish.jersey.media.sse.EventListener; +import org.glassfish.jersey.media.sse.EventOutput; +import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.InboundEvent; +import org.glassfish.jersey.media.sse.OutboundEvent; +import org.glassfish.jersey.media.sse.SseFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Ignore; +import org.junit.Test; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Event output tests. + * + * @author Pavel Bucek + * @author Marek Potociar + */ +public class EventOutputTest extends JerseyTest { + + @Override + protected Application configure() { + return new ResourceConfig(SseTestResource.class, SseFeature.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.register(SseFeature.class); + config.connectorProvider(new HelidonConnectorProvider()); + } + + /** + * SSE Test resource. + */ + @Path("test") + @Produces(SseFeature.SERVER_SENT_EVENTS) + public static class SseTestResource { + + @GET + @Path("single") + public EventOutput getSingleEvent() { + final EventOutput output = new EventOutput(); + try { + return output; + } finally { + new Thread() { + public void run() { + try { + output.write(new OutboundEvent.Builder().data(String.class, "single").build()); + output.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + }.start(); + } + } + + @GET + @Path("closed-single") + public EventOutput getClosedSingleEvent() throws IOException { + final EventOutput output = new EventOutput(); + output.write(new OutboundEvent.Builder().data(String.class, "closed").build()); + output.close(); + return output; + } + + @GET + @Path("closed-empty") + public EventOutput getClosedEmpty() throws IOException { + final EventOutput output = new EventOutput(); + output.close(); + return output; + } + + @GET + @Path("charset") + @Produces("text/event-stream;charset=utf-8") + public EventOutput getSseWithCharset() throws IOException { + final EventOutput output = new EventOutput(); + output.write(new OutboundEvent.Builder().data(String.class, "charset").build()); + output.close(); + return output; + } + + @GET + @Path("comments-only") + public EventOutput getCommentsOnlyStream() throws IOException { + final EventOutput output = new EventOutput(); + output.write(new OutboundEvent.Builder().comment("No comment #1").build()); + output.write(new OutboundEvent.Builder().comment("No comment #2").build()); + output.close(); + return output; + } + } + + @Test + @Ignore //2.0.0-M3 + public void testReadSseEventAsPlainString() throws Exception { + final Response r = target().path("test/single").request().get(Response.class); + assertThat(r.readEntity(String.class), containsString("single")); + } + + /** + * Reproducer for JERSEY-2912: Sending and receiving comments-only events. + * + * @throws Exception + */ + @Test + public void testReadCommentsOnlySseEvents() throws Exception { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.property(ClientProperties.CONNECT_TIMEOUT, 15000); + clientConfig.property(ClientProperties.READ_TIMEOUT, 0); + clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8); + clientConfig.connectorProvider(new HelidonConnectorProvider()); + Client client = ClientBuilder.newBuilder().withConfig(clientConfig).build(); + + final CountDownLatch latch = new CountDownLatch(2); + final Queue eventComments = new ArrayBlockingQueue<>(2); + WebTarget single = client.target(getBaseUri()).path("test/comments-only"); + EventSource es = EventSource.target(single).build(); + es.register(new EventListener() { + @Override + public void onEvent(InboundEvent inboundEvent) { + eventComments.add(inboundEvent.getComment()); + latch.countDown(); + } + }); + + boolean latchTimedOut; + boolean closeTimedOut; + try { + es.open(); + latchTimedOut = latch.await(5 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS); + } finally { + closeTimedOut = es.close(5, TimeUnit.SECONDS); + } + + assertEquals("Unexpected event count", 2, eventComments.size()); + for (int i = 1; i <= 2; i++) { + assertEquals("Unexpected comment data on event #" + i, "No comment #" + i, eventComments.poll()); + } + assertTrue("Event latch has timed out", latchTimedOut); + assertTrue("EventSource.close() has timed out", closeTimedOut); + } + + @Test + @Ignore // 2.0.0-M3 + public void testReadFromClosedOutput() throws Exception { + /** + * Need to disable HTTP Keep-Alive to prevent this test from hanging in HttpURLConnection + * due to an attempt to read from a stale, out-of-sync connection closed by the server. + * Thus setting the "Connection: close" HTTP header on all requests. + */ + Response r; + r = target().path("test/closed-empty").request().header("Connection", "close").get(); + assertTrue(r.readEntity(String.class).isEmpty()); + + r = target().path("test/closed-single").request().header("Connection", "close").get(); + assertTrue(r.readEntity(String.class).contains("closed")); + + // + + EventInput input; + input = target().path("test/closed-single").request().header("Connection", "close").get(EventInput.class); + assertEquals("closed", input.read().readData()); + assertEquals(null, input.read()); + assertTrue(input.isClosed()); + + input = target().path("test/closed-empty").request().header("Connection", "close").get(EventInput.class); + assertEquals(null, input.read()); + assertTrue(input.isClosed()); + } + + @Test + public void testSseContentTypeWithCharset() { + /** + * Need to disable HTTP Keep-Alive to prevent this test from hanging in HttpURLConnection + * due to an attempt to read from a stale, out-of-sync connection closed by the server. + * Thus setting the "Connection: close" HTTP header on all requests. + */ + Response r; + r = target().path("test/charset").request().header("Connection", "close").get(); + assertTrue(r.getMediaType().getParameters().get("charset").equalsIgnoreCase("utf-8")); + final EventInput eventInput = r.readEntity(EventInput.class); + String eventData = eventInput.read().readData(); + assertEquals("charset", eventData); + eventInput.close(); + } + + @Test + public void testConnectorWithEventSource() throws InterruptedException { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.property(ClientProperties.CONNECT_TIMEOUT, 15000); + clientConfig.property(ClientProperties.READ_TIMEOUT, 0); + clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8); + clientConfig.connectorProvider(new HelidonConnectorProvider()); + Client client = ClientBuilder.newBuilder().withConfig(clientConfig).build(); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference eventData = new AtomicReference(); + final AtomicInteger counter = new AtomicInteger(0); + WebTarget single = client.target(getBaseUri()).path("test/single"); + EventSource es = EventSource.target(single).build(); + es.register(new EventListener() { + @Override + public void onEvent(InboundEvent inboundEvent) { + final int i = counter.incrementAndGet(); + if (i == 1) { + eventData.set(inboundEvent.readData()); + } + latch.countDown(); + } + }); + + boolean latchTimedOut; + boolean closeTimedOut; + try { + es.open(); + latchTimedOut = latch.await(5 * getAsyncTimeoutMultiplier(), TimeUnit.SECONDS); + } finally { + closeTimedOut = es.close(5, TimeUnit.SECONDS); + } + + // 2.0.0.-M3 assertEquals("Unexpected event count", 1, counter.get()); + assertEquals("Unexpected event data", "single", eventData.get()); + assertTrue("Event latch has timed out", latchTimedOut); + assertTrue("EventSource.close() has timed out", closeTimedOut); + } +} diff --git a/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java new file mode 100644 index 0000000000..61a03bf641 --- /dev/null +++ b/connectors/helidon-connector/src/test/java/org/glassfish/jersey/helidon/connector/sse/SseTest.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0, which is available at + * http://www.eclipse.org/legal/epl-2.0. + * + * This Source Code may also be made available under the following Secondary + * Licenses when the conditions for such availability set forth in the + * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, + * version 2 with the GNU Classpath Exception, which is available at + * https://www.gnu.org/software/classpath/license.html. + * + * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 + */ + +package org.glassfish.jersey.helidon.connector.sse; + +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.helidon.connector.HelidonConnectorProvider; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.PostConstruct; +import javax.inject.Singleton; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseEventSink; +import javax.ws.rs.sse.SseEventSource; +import java.io.Closeable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class SseTest extends JerseyTest { + + private static String PALINDROME = "neveroddoreven"; + private static int WAIT_TIME = 5000; + + @Path("simple") + public static class SimpleSseResource { + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public void send(@Context SseEventSink sink, @Context Sse sse) { + try (SseEventSink s = sink) { + for (int i = 0; i != 10; i++) { + s.send(sse.newEvent("A")); + } + } + } + } + + @Path("broadcast") + @Singleton + public static class BroadcasterResource { + private static final String WELCOME = "Welcome"; + + @Context + private Sse sse; + + private static SseBroadcaster sseBroadcaster; + + @PostConstruct + public void init() { + System.out.println("INIT"); + sseBroadcaster = sse.newBroadcaster(); + } + + @GET + @Path("register") + @Produces(MediaType.SERVER_SENT_EVENTS) + public void register(@Context SseEventSink sink) { + sseBroadcaster.register(sink); + sink.send(sse.newEvent(WELCOME)); + } + + @POST + @Path("broadcast") + @Consumes(MediaType.TEXT_PLAIN) + public void broadcast(String event) { + sseBroadcaster.broadcast(sse.newEvent(event)); + } + } + + @Override + protected Application configure() { + return new ResourceConfig(SimpleSseResource.class, BroadcasterResource.class); + } + + @Override + protected void configureClient(ClientConfig config) { + config.connectorProvider(new HelidonConnectorProvider()); + //config.property("jersey.config.helidon.client.entity.type", "OUTPUT_STREAM_PUBLISHER"); + } + + @Test + public void testSend() throws InterruptedException { + final StringBuilder sb = new StringBuilder(); + final CountDownLatch latch = new CountDownLatch(10); + try (SseEventSource source = SseEventSource.target(target().path("simple")).build()) { + source.register((event) -> sb.append(event.readData())); + source.register((event) -> latch.countDown()); + source.open(); + + latch.await(WAIT_TIME, TimeUnit.MILLISECONDS); + } + + Assert.assertEquals("AAAAAAAAAA", sb.toString()); + Assert.assertEquals(0, latch.getCount()); + } + + @Test + public void testBroadcast() throws InterruptedException { + final BroadcasterClient clientOne = new BroadcasterClient(target()); + final BroadcasterClient clientTwo = new BroadcasterClient(target()); + + clientOne.register(); + clientTwo.register(); + + clientOne.broadcast(); + clientTwo.broadcast(); + + clientOne.messageLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS); + clientTwo.messageLatch.await(WAIT_TIME, TimeUnit.MILLISECONDS); + + Assert.assertEquals(0, clientOne.messageLatch.getCount()); + Assert.assertEquals(0, clientTwo.messageLatch.getCount()); + + Assert.assertEquals(BroadcasterResource.WELCOME + PALINDROME + PALINDROME, clientOne.message.toString()); + Assert.assertEquals(BroadcasterResource.WELCOME + PALINDROME + PALINDROME, clientTwo.message.toString()); + + clientOne.close(); + clientTwo.close(); + } + + private static class BroadcasterClient implements Closeable { + private final WebTarget target; + private final CountDownLatch messageLatch = new CountDownLatch(3); + private final SseEventSource source; + private final StringBuilder message = new StringBuilder(); + + private BroadcasterClient(WebTarget target) { + this.target = target; + source = SseEventSource.target(target.path("broadcast/register")).build(); + } + + private void register() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + source.register((event) -> message.append(event.readData())); + source.register((event) -> latch.countDown()); + source.register((event) -> messageLatch.countDown()); + source.open(); + + latch.await(WAIT_TIME, TimeUnit.MILLISECONDS); + Assert.assertEquals(0, latch.getCount()); + } + + private void broadcast() { + try (Response r = target.path("broadcast/broadcast") + .request().buildPost(Entity.entity(PALINDROME, MediaType.TEXT_PLAIN)).invoke()) { + Assert.assertEquals(204, r.getStatus()); + } + } + + @Override + public void close() { + source.close(); + } + } +} diff --git a/connectors/pom.xml b/connectors/pom.xml index 37ecbda4c4..87875d8c39 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -75,4 +75,16 @@ test + + + + HelidonConnector + + 11 + + + helidon-connector + + +