Skip to content

Commit

Permalink
[fix][broker] Fix PulsarService.getLookupServiceAddress returns wrong…
Browse files Browse the repository at this point in the history
… port if TLS is enabled (apache#21015)

(cherry picked from commit 1363777)
(cherry picked from commit 628e79d)
  • Loading branch information
Technoboy- authored and mukesh-ctds committed Mar 6, 2024
1 parent f4348dc commit 463e163
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,19 +439,13 @@ public void testTopicPoliciesWithMultiBroker() throws Exception {
String tenantName = newUniqueName("prop-xyz2");
admin.tenants().createTenant(tenantName, tenantInfo);
admin.namespaces().createNamespace(tenantName + "/ns1", Set.of("test"));
conf.setBrokerServicePort(Optional.of(1024));
conf.setBrokerServicePortTls(Optional.of(1025));
conf.setWebServicePort(Optional.of(1026));
conf.setWebServicePortTls(Optional.of(1027));
ServiceConfiguration config2 = super.getDefaultConf();
@Cleanup
PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(conf);
PulsarTestContext pulsarTestContext2 = createAdditionalPulsarTestContext(config2);
PulsarService pulsar2 = pulsarTestContext2.getPulsarService();
conf.setBrokerServicePort(Optional.of(2048));
conf.setBrokerServicePortTls(Optional.of(2049));
conf.setWebServicePort(Optional.of(2050));
conf.setWebServicePortTls(Optional.of(2051));
ServiceConfiguration config3 = super.getDefaultConf();
@Cleanup
PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(conf);
PulsarTestContext pulsarTestContext3 = createAdditionalPulsarTestContext(config3);
PulsarService pulsar3 = pulsarTestContext.getPulsarService();
@Cleanup
PulsarAdmin admin2 = PulsarAdmin.builder().serviceHttpUrl(pulsar2.getWebServiceAddress()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
",public_https:https://localhost:" + httpsPort);
conf.setBrokerServicePort(Optional.of(pulsarPort));
conf.setWebServicePort(Optional.of(httpPort));
conf.setWebServicePortTls(Optional.of(httpsPort));
}

@Test
Expand All @@ -101,7 +100,6 @@ public void testLookup() throws Exception {

assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");


// Produce data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ void setup() throws Exception {
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ void setup() throws Exception {
config1.setBrokerServicePort(Optional.of(0));
config1.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setAdvertisedAddress("localhost");
pulsar1 = new PulsarService(config1);
pulsar1.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ void setup() throws Exception {
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
pulsar1 = new PulsarService(config1);
pulsar1.start();

Expand All @@ -197,8 +195,6 @@ void setup() throws Exception {
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
pulsar2 = new PulsarService(config2);
pulsar2.start();

Expand All @@ -213,8 +209,6 @@ void setup() throws Exception {
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
pulsar3 = new PulsarService(config);

secondaryBrokerId = pulsar2.getBrokerId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ protected void startBroker() throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
conf.setWebServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
serviceConfigurationList.add(conf);

PulsarTestContext.Builder testContextBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,4 +1786,13 @@ public void testUnsubscribeNonDurableSub() throws Exception {
fail("Unsubscribe failed");
}
}

@Test
public void testGetLookupServiceAddress() throws Exception {
cleanup();
setup();
conf.setWebServicePortTls(Optional.of(8081));
assertEquals(pulsar.getLookupServiceAddress(), "localhost:8081");
resetState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ protected void startBroker() throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
conf.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
conf.setAdvertisedAddress("localhost");
conf.setWebServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTransactionCoordinatorEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {
public static final String NAME = "pulsar-broker";

public BrokerContainer(String clusterName, String hostName) {
super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_PORT_TLS,
BROKER_HTTP_PORT, BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
this(clusterName, hostName, false);
}

public BrokerContainer(String clusterName, String hostName, boolean enableTls) {
super(clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT,
enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
tailContainerLog();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ public class ProxyContainer extends PulsarContainer<ProxyContainer> {
public static final String NAME = "pulsar-proxy";

public ProxyContainer(String clusterName, String hostName) {
super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT, BROKER_PORT_TLS, BROKER_HTTP_PORT,
BROKER_HTTPS_PORT, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
this(clusterName, hostName, false);
}

public ProxyContainer(String clusterName, String hostName, boolean enableTls) {
super(clusterName, hostName, hostName, "bin/run-proxy.sh", BROKER_PORT,
enableTls ? BROKER_PORT_TLS : 0, BROKER_HTTP_PORT,
enableTls ? BROKER_HTTPS_PORT : 0, DEFAULT_HTTP_PATH, DEFAULT_IMAGE_NAME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -41,6 +42,14 @@ private static String loadCertificateAuthorityFile(String name) {
return Resources.getResource("certificate-authority/" + name).getPath();
}

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.enableTls(true);
return specBuilder;
}

@DataProvider(name = "adminUrls")
public Object[][] adminUrls() {
return new Object[][]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.tests.integration.containers.BKContainer;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
Expand Down Expand Up @@ -132,22 +133,32 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();

this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME)
this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName)
.withEnv("clusterName", clusterName);
// enable mTLS
if (spec.enableTls) {
proxyContainer
.withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
.withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
.withEnv("forwardAuthorizationCredentials", "true")
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
.withEnv("tlsAllowInsecureConnection", "false")
.withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
.withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
.withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem");
.withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
.withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName())
.withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", "/pulsar/certificate-authority/client-keys/admin.cert.pem", "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
.withEnv("tlsEnabledWithBroker", "true")
.withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
.withEnv("brokerClientCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
.withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");

}
if (spec.proxyEnvs != null) {
spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
}
Expand Down Expand Up @@ -184,7 +195,7 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
// create brokers
brokerContainers.putAll(
runNumContainers("broker", spec.numBrokers(), (name) -> {
BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name))
BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name), spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName(name))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
Expand All @@ -195,16 +206,19 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
.withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey").withEnv("AWS_SECRET_KEY", "secretkey")
.withEnv("maxMessageSize", "" + spec.maxMessageSize)
.withEnv("maxMessageSize", "" + spec.maxMessageSize);
if (spec.enableTls) {
// enable mTLS
.withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
.withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
.withEnv("authenticateOriginalAuthData", "true")
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
.withEnv("tlsAllowInsecureConnection", "false")
.withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
.withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem")
.withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem");
brokerContainer
.withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
.withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
.withEnv("authenticateOriginalAuthData", "true")
.withEnv("tlsAllowInsecureConnection", "false")
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
.withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
.withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
.withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
}
if (spec.queryLastMessage) {
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,10 @@ public class PulsarClusterSpec {
* Additional ports to expose on bookie containers.
*/
List<Integer> bookieAdditionalPorts;

/**
* Enable TLS for connection.
*/
@Default
boolean enableTls = false;
}

0 comments on commit 463e163

Please # to comment.