Skip to content

Commit

Permalink
Merge pull request #5316 from franz1981/4.x_unified_allocator_fix_ssl
Browse files Browse the repository at this point in the history
Enable buffer pooling settings with a SSL configurable option
  • Loading branch information
vietj authored Nov 5, 2024
2 parents a7c2ba5 + a3393ac commit ccca2b8
Show file tree
Hide file tree
Showing 8 changed files with 554 additions and 15 deletions.
20 changes: 19 additions & 1 deletion src/main/java/io/vertx/core/net/JdkSSLEngineOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,31 @@ public static synchronized boolean isAlpnAvailable() {
return jdkAlpnAvailable;
}

private boolean pooledHeapBuffers = false;

public JdkSSLEngineOptions() {
}

public JdkSSLEngineOptions(JsonObject json) {
super(json);
pooledHeapBuffers = json.getBoolean("pooledHeapBuffers", false);
}

public JdkSSLEngineOptions(JdkSSLEngineOptions that) {
super(that);
pooledHeapBuffers = that.pooledHeapBuffers;
}

/**
* Set whether to use pooled heap buffers. Default is {@code false}, but it is recommended to use pooled buffers
*/
public JdkSSLEngineOptions setPooledHeapBuffers(boolean pooledHeapBuffers) {
this.pooledHeapBuffers = pooledHeapBuffers;
return this;
}

public boolean isPooledHeapBuffers() {
return pooledHeapBuffers;
}

@Override
Expand All @@ -72,7 +88,9 @@ public JdkSSLEngineOptions setUseWorkerThread(boolean useWorkerThread) {
}

public JsonObject toJson() {
return new JsonObject();
JsonObject jsonObject = new JsonObject();
jsonObject.put("pooledHeapBuffers", pooledHeapBuffers);
return jsonObject;
}

@Override
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
Expand All @@ -49,7 +48,6 @@
import java.net.ConnectException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

/**
Expand Down Expand Up @@ -297,7 +295,8 @@ private void connectInternal2(ProxyOptions proxyOptions,
Objects.requireNonNull(connectHandler, "No null connectHandler accepted");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop);
bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
bootstrap.option(ChannelOption.ALLOCATOR,
sslHelper.clientByteBufAllocator(sslChannelProvider.sslContextProvider()));

vertx.transport().configure(options, remoteAddress.isDomainSocket(), bootstrap);

Expand Down
36 changes: 33 additions & 3 deletions src/main/java/io/vertx/core/net/impl/SSLHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@

package io.vertx.core.net.impl;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslProvider;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.PartialPooledByteBufAllocator;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.ClientOptionsBase;
Expand Down Expand Up @@ -57,6 +60,27 @@ public class SSLHelper {
CLIENT_AUTH_MAPPING.put(ClientAuth.NONE, io.netty.handler.ssl.ClientAuth.NONE);
}

ByteBufAllocator clientByteBufAllocator(SslContextProvider ctxProvider) {
if (usesJDKSSLWithPooledHeapBuffers(ctxProvider)) {
return PooledByteBufAllocator.DEFAULT;
}
return PartialPooledByteBufAllocator.INSTANCE;
}

ByteBufAllocator serverByteBufAllocator(SslContextProvider ctxProvider) {
if (!ssl || usesJDKSSLWithPooledHeapBuffers(ctxProvider)) {
return PooledByteBufAllocator.DEFAULT;
}
return PartialPooledByteBufAllocator.INSTANCE;
}

private boolean usesJDKSSLWithPooledHeapBuffers(SslContextProvider ctxProvider) {
return ssl && sslEngineOptions instanceof JdkSSLEngineOptions &&
ctxProvider.jdkSSLProvider() &&
((JdkSSLEngineOptions) sslEngineOptions).isPooledHeapBuffers();
}


/**
* Resolve the ssl engine options to use for properly running the configured options.
*/
Expand Down Expand Up @@ -149,18 +173,22 @@ private static class CachedProvider {

private class EngineConfig {

private final boolean jdkSSLProvider;
private final SSLOptions sslOptions;
private final Supplier<SslContextFactory> supplier;
private final boolean useWorkerPool;

public EngineConfig(SSLOptions sslOptions, Supplier<SslContextFactory> supplier, boolean useWorkerPool) {
public EngineConfig(boolean jdkSSLProvider, SSLOptions sslOptions, Supplier<SslContextFactory> supplier,
boolean useWorkerPool) {
this.jdkSSLProvider = jdkSSLProvider;
this.sslOptions = sslOptions;
this.supplier = supplier;
this.useWorkerPool = useWorkerPool;
}

SslContextProvider sslContextProvider() {
return new SslContextProvider(
jdkSSLProvider,
clientAuth,
endpointIdentificationAlgorithm,
applicationProtocols,
Expand Down Expand Up @@ -291,18 +319,20 @@ private Future<EngineConfig> build(SSLOptions sslOptions, ContextInternal ctx) {
}).compose(v2 -> ctx.<EngineConfig>executeBlockingInternal(p -> {
Supplier<SslContextFactory> supplier;
boolean useWorkerPool;
final boolean jdkSSLProvider;
try {
SSLEngineOptions resolvedEngineOptions = resolveEngineOptions(sslEngineOptions, useAlpn);
supplier = resolvedEngineOptions::sslContextFactory;
useWorkerPool = resolvedEngineOptions.getUseWorkerThread();
jdkSSLProvider = resolvedEngineOptions instanceof JdkSSLEngineOptions;
} catch (Exception e) {
p.fail(e);
return;
}
p.complete(new EngineConfig(sslOptions, supplier, useWorkerPool));
p.complete(new EngineConfig(jdkSSLProvider, sslOptions, supplier, useWorkerPool));
})).onComplete(promise);
} else {
sslContextFactorySupplier = Future.succeededFuture(new EngineConfig(sslOptions, () -> new DefaultSslContextFactory(SslProvider.JDK, false), SSLEngineOptions.DEFAULT_USE_WORKER_POOL));
sslContextFactorySupplier = Future.succeededFuture(new EngineConfig(true, sslOptions, () -> new DefaultSslContextFactory(SslProvider.JDK, false), SSLEngineOptions.DEFAULT_USE_WORKER_POOL));
}
return sslContextFactorySupplier;
}
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/io/vertx/core/net/impl/SslContextProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
*/
public class SslContextProvider {

private final boolean jdkSSLProvider;
private final Supplier<SslContextFactory> provider;
private final Set<String> enabledProtocols;
private final List<CRL> crls;
Expand All @@ -42,7 +43,8 @@ public class SslContextProvider {
private final Function<String, KeyManagerFactory> keyManagerFactoryMapper;
private final Function<String, TrustManager[]> trustManagerMapper;

public SslContextProvider(ClientAuth clientAuth,
public SslContextProvider(boolean jdkSSLProvider,
ClientAuth clientAuth,
String endpointIdentificationAlgorithm,
List<String> applicationProtocols,
Set<String> enabledCipherSuites,
Expand All @@ -53,6 +55,7 @@ public SslContextProvider(ClientAuth clientAuth,
Function<String, TrustManager[]> trustManagerMapper,
List<CRL> crls,
Supplier<SslContextFactory> provider) {
this.jdkSSLProvider = jdkSSLProvider;
this.provider = provider;
this.clientAuth = clientAuth;
this.endpointIdentificationAlgorithm = endpointIdentificationAlgorithm;
Expand All @@ -66,6 +69,10 @@ public SslContextProvider(ClientAuth clientAuth,
this.crls = crls;
}

boolean jdkSSLProvider() {
return jdkSSLProvider;
}

public VertxSslContext createContext(boolean server,
KeyManagerFactory keyManagerFactory,
TrustManager[] trustManagers,
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/io/vertx/core/net/impl/TCPServerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,12 @@ private synchronized Future<Channel> listen(SocketAddress localAddress, ContextI
// Initialize SSL before binding
sslChannelProvider = sslHelper.updateSslContext(options.getSslOptions(), true, listenContext).onComplete(ar -> {
if (ar.succeeded()) {

// Socket bind
channelBalancer.addWorker(eventLoop, worker);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(vertx.getAcceptorEventLoopGroup(), channelBalancer.workers());
if (options.isSsl()) {
bootstrap.childOption(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
} else {
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
bootstrap.childOption(ChannelOption.ALLOCATOR,
sslHelper.serverByteBufAllocator(ar.result().sslChannelProvider().sslContextProvider()));

bootstrap.childHandler(channelBalancer);
applyConnectionOptions(localAddress.isDomainSocket(), bootstrap);
Expand Down
6 changes: 5 additions & 1 deletion src/test/java/io/vertx/core/net/NetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -526,12 +526,14 @@ public void testClientOptionsJson() {
int reconnectAttempts = TestUtils.randomPositiveInt();
long reconnectInterval = TestUtils.randomPositiveInt();
boolean useAlpn = TestUtils.randomBoolean();
boolean pooledHeapBuffers = rand.nextBoolean();
String hostnameVerificationAlgorithm = TestUtils.randomAlphaString(10);
String sslEngine;
JsonObject sslEngineOptions;
if (TestUtils.randomBoolean()) {
sslEngine = "jdkSslEngineOptions";
sslEngineOptions = new JsonObject();
sslEngineOptions = new JsonObject()
.put("pooledHeapBuffers", pooledHeapBuffers);
} else {
sslEngine = "openSslEngineOptions";
boolean sessionCacheEnabled = rand.nextBoolean();
Expand Down Expand Up @@ -599,6 +601,8 @@ public void testClientOptionsJson() {
switch (sslEngine) {
case "jdkSslEngineOptions":
assertTrue(options.getSslEngineOptions() instanceof JdkSSLEngineOptions);
JdkSSLEngineOptions jdkSSLEngineOptions = (JdkSSLEngineOptions) options.getSslEngineOptions();
assertEquals(pooledHeapBuffers, jdkSSLEngineOptions.isPooledHeapBuffers());
break;
case "openSslEngineOptions":
assertTrue(options.getSslEngineOptions() instanceof OpenSSLEngineOptions);
Expand Down
Loading

0 comments on commit ccca2b8

Please # to comment.