From cd064377abf24ce277efc325daeac9b3e870251a Mon Sep 17 00:00:00 2001 From: Michael Edgar Date: Fri, 13 Jan 2023 15:07:41 -0500 Subject: [PATCH] [MGDSTRM-9790] Test instance upgrade while suspended Signed-off-by: Michael Edgar --- .../bf2/operator/operands/KafkaCluster.java | 4 + .../src/main/resources/application.properties | 2 + .../org/bf2/sync/ManagedKafkaAgentSync.java | 2 +- .../java/org/bf2/sync/ManagedKafkaSync.java | 2 +- .../sync/controlplane/MockControlPlane.java | 8 + .../bf2/sync/informer/InformerManager.java | 2 +- .../src/main/resources/application.properties | 2 + .../systemtest/api/sync/SyncApiClient.java | 37 +++- .../systemtest/framework/SecurityUtils.java | 6 + .../framework/TestPlanExecutionListener.java | 100 +++++++-- .../operator/FleetShardOperatorManager.java | 6 +- .../operator/KeycloakOperatorManager.java | 3 +- .../systemtest/integration/AbstractST.java | 22 ++ .../integration/ManagedKafkaST.java | 27 --- .../bf2/systemtest/integration/SmokeST.java | 44 +--- .../bf2/systemtest/integration/UpgradeST.java | 196 +++++++++++++++--- .../bf2/systemtest/unit/SuiteUnitTest.java | 18 +- .../src/main/java/org/bf2/test/TestUtils.java | 28 ++- 18 files changed, 376 insertions(+), 133 deletions(-) diff --git a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java index 4d22ec25d..382daf703 100644 --- a/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java +++ b/operator/src/main/java/org/bf2/operator/operands/KafkaCluster.java @@ -399,6 +399,10 @@ private Kafka upgrade(ManagedKafka managedKafka, KafkaBuilder kafkaBuilder) { * @return true if suspension should be blocked, otherwise false */ private boolean blockSuspension(ManagedKafka managedKafka) { + if (isStrimziUpdating(managedKafka)) { + return true; + } + if (kafkaManager.hasKafkaVersionChanged(managedKafka)) { return true; } diff --git a/operator/src/main/resources/application.properties b/operator/src/main/resources/application.properties index f17447c08..83681713b 100644 --- a/operator/src/main/resources/application.properties +++ b/operator/src/main/resources/application.properties @@ -75,6 +75,8 @@ quarkus.kubernetes.labels.app=kas-fleetshard-operator # deactivate CRD checking from Java Operator SDK quarkus.operator-sdk.crd.validate=false quarkus.operator-sdk.disable-rbac-generation=true +# increasing retries from the default max-attempts of 5 +quarkus.operator-sdk.controllers."managedkafkacontroller".retry.max-attempts=20 quarkus.arc.test.disable-application-lifecycle-observers=true diff --git a/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java b/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java index 53377c906..35ba92ace 100644 --- a/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java +++ b/sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java @@ -32,7 +32,7 @@ public class ManagedKafkaAgentSync { @Timed(value = "sync.poll", extraTags = {"resource", "ManagedKafkaAgent"}, description = "The time spent processing polling calls") @Counted(value = "sync.poll", extraTags = {"resource", "ManagedKafkaAgent"}, description = "The number of polling calls") - @Scheduled(every = "{poll.interval}", concurrentExecution = ConcurrentExecution.SKIP) + @Scheduled(every = "{poll.interval}", delayed = "{poll.delay}", concurrentExecution = ConcurrentExecution.SKIP) void loop() { ManagedKafkaAgent managedKafkaAgent = controlPlane.getManagedKafkaAgent(); Objects.requireNonNull(managedKafkaAgent); diff --git a/sync/src/main/java/org/bf2/sync/ManagedKafkaSync.java b/sync/src/main/java/org/bf2/sync/ManagedKafkaSync.java index 0b6c9d461..eedcef60b 100644 --- a/sync/src/main/java/org/bf2/sync/ManagedKafkaSync.java +++ b/sync/src/main/java/org/bf2/sync/ManagedKafkaSync.java @@ -369,7 +369,7 @@ void checkCreationTimestamp(ManagedKafka existing, Duration duration) { } } - @Scheduled(every = "{poll.interval}", concurrentExecution = ConcurrentExecution.SKIP) + @Scheduled(every = "{poll.interval}", delayed = "{poll.delay}", concurrentExecution = ConcurrentExecution.SKIP) void pollKafkaClusters() { log.debug("Polling for control plane managed kafkas"); // TODO: this is based upon a full poll - eventually this could be diff --git a/sync/src/main/java/org/bf2/sync/controlplane/MockControlPlane.java b/sync/src/main/java/org/bf2/sync/controlplane/MockControlPlane.java index edf24fb90..8e224e390 100644 --- a/sync/src/main/java/org/bf2/sync/controlplane/MockControlPlane.java +++ b/sync/src/main/java/org/bf2/sync/controlplane/MockControlPlane.java @@ -179,6 +179,14 @@ public void deleteCluster(@PathParam("clusterid") String clusterId) { markForDeletion(clusterId); } + @DELETE + @Path("/{id}/kafkas") + @Produces(MediaType.APPLICATION_JSON) + public void deleteAllClusters() { + log.infof("control plane:: received request to delete all ManagedKafkas"); + this.kafkas.keySet().forEach(this::markForDeletion); + } + @PUT @Path("/{id}") @Produces(MediaType.APPLICATION_JSON) diff --git a/sync/src/main/java/org/bf2/sync/informer/InformerManager.java b/sync/src/main/java/org/bf2/sync/informer/InformerManager.java index 6eefedc03..70f53f6ad 100644 --- a/sync/src/main/java/org/bf2/sync/informer/InformerManager.java +++ b/sync/src/main/java/org/bf2/sync/informer/InformerManager.java @@ -47,7 +47,7 @@ protected void onStart() { CustomResourceEventHandler.of(controlPlane::updateKafkaClusterStatus)); // for the Agent - managedAgentInformer = resourceInformerFactory.create(ManagedKafkaAgent.class, client.resources(ManagedKafkaAgent.class).inAnyNamespace(), + managedAgentInformer = resourceInformerFactory.create(ManagedKafkaAgent.class, client.resources(ManagedKafkaAgent.class).inNamespace(client.getNamespace()), CustomResourceEventHandler.of(controlPlane::updateAgentStatus)); secretInformer = resourceInformerFactory.create(Secret.class, client.secrets().inAnyNamespace().withLabels(OperandUtils.getMasterSecretLabel()), diff --git a/sync/src/main/resources/application.properties b/sync/src/main/resources/application.properties index da208b69f..ce9e42608 100644 --- a/sync/src/main/resources/application.properties +++ b/sync/src/main/resources/application.properties @@ -12,6 +12,7 @@ secret.name=addon-kas-fleetshard-operator-parameters sso.enabled=false sso.filter.enabled=true secret.enabled=true +poll.delay=${poll-delay:0s} quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) %x %s%e%n quarkus.kubernetes.ports.http.host-port=8080 @@ -28,6 +29,7 @@ quarkus.kubernetes.ports.http.host-port=8080 %test.secret.enabled=false %test.sync.mock-control-plane.simulate=false %test.quarkus.log.category."org.bf2".level=DEBUG +%test.poll.delay=15s %test.poll.interval=5s # control plane properties diff --git a/systemtest/src/main/java/org/bf2/systemtest/api/sync/SyncApiClient.java b/systemtest/src/main/java/org/bf2/systemtest/api/sync/SyncApiClient.java index 7bd6a6133..fa1f05f57 100644 --- a/systemtest/src/main/java/org/bf2/systemtest/api/sync/SyncApiClient.java +++ b/systemtest/src/main/java/org/bf2/systemtest/api/sync/SyncApiClient.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -59,6 +60,19 @@ public static HttpResponse deleteManagedKafka(String id, String endpoint return retry(() -> client.send(request, HttpResponse.BodyHandlers.ofString())); } + public static HttpResponse deleteManagedKafkas(String endpoint) throws Exception { + LOGGER.info("Deleting all managed kafkas"); + HttpClient client = HttpClient.newHttpClient(); + URI uri = URI.create(endpoint + BASE_PATH + "pepa/kafkas"); + HttpRequest request = HttpRequest.newBuilder() + .uri(uri) + .DELETE() + .timeout(Duration.ofMinutes(2)) + .build(); + LOGGER.info("Sending DELETE request to {} with port {} and path {}", uri.getHost(), uri.getPort(), uri.getPath()); + return retry(() -> client.send(request, HttpResponse.BodyHandlers.ofString())); + } + public static HttpResponse getManagedKafkaAgentStatus(String endpoint) throws Exception { LOGGER.info("Get managed kafka agent status"); return getRequest("pepa/status", endpoint); @@ -116,26 +130,41 @@ private static Stream getSortedAvailableStrimziVersions(String endpoint) } public static Stream getSortedAvailableStrimziVersions(Supplier statusSupplier) { + AtomicReference agentStatus = new AtomicReference<>(); + TestUtils.waitFor("Strimzi version is reported", 1_000, 60_000, () -> { try { - return statusSupplier.get().getStrimzi().size() > 0; + ManagedKafkaAgentStatus status = statusSupplier.get(); + agentStatus.set(status); + return !status.getStrimzi().isEmpty(); } catch (Exception e) { return false; } }); - return sortedStrimziVersion(statusSupplier.get().getStrimzi().stream().map(StrimziVersionStatus::getVersion)); + return sortedStrimziVersion(agentStatus.get().getStrimzi().stream().map(StrimziVersionStatus::getVersion)); } public static Stream getKafkaVersions(Supplier statusSupplier, String strimziVersion) { + AtomicReference agentStatus = new AtomicReference<>(); + TestUtils.waitFor("Strimzi version is reported", 1_000, 60_000, () -> { try { - return statusSupplier.get().getStrimzi().size() > 0; + ManagedKafkaAgentStatus status = statusSupplier.get(); + agentStatus.set(status); + return !status.getStrimzi().isEmpty(); } catch (Exception e) { return false; } }); - return statusSupplier.get().getStrimzi().stream().filter(item -> item.getVersion().equals(strimziVersion)).findFirst().get().getKafkaVersions().stream(); + + return agentStatus.get() + .getStrimzi() + .stream() + .filter(item -> item.getVersion().equals(strimziVersion)) + .findFirst() + .map(selectedVersion -> selectedVersion.getKafkaVersions().stream()) + .orElseGet(Stream::empty); } public static String getLatestAvailableKafkaVersion(Supplier statusSupplier, String strimziVersion) { diff --git a/systemtest/src/main/java/org/bf2/systemtest/framework/SecurityUtils.java b/systemtest/src/main/java/org/bf2/systemtest/framework/SecurityUtils.java index 536bcc1c2..d20144507 100644 --- a/systemtest/src/main/java/org/bf2/systemtest/framework/SecurityUtils.java +++ b/systemtest/src/main/java/org/bf2/systemtest/framework/SecurityUtils.java @@ -6,6 +6,8 @@ import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier; import org.bouncycastle.asn1.x509.BasicConstraints; import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.GeneralName; +import org.bouncycastle.asn1.x509.GeneralNames; import org.bouncycastle.asn1.x509.SubjectKeyIdentifier; import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; import org.bouncycastle.cert.CertIOException; @@ -126,6 +128,10 @@ private static X509Certificate generate(final KeyPair keyPair, certificateBuilder.addExtension(Extension.authorityKeyIdentifier, false, authority); certificateBuilder.addExtension(Extension.basicConstraints, true, new BasicConstraints(true)); + // Add SAN (required by some clients) + GeneralNames subjectAltName = new GeneralNames(new GeneralName(GeneralName.dNSName, "*." + domain)); + certificateBuilder.addExtension(Extension.subjectAlternativeName, false, subjectAltName); + final ContentSigner contentSigner = new JcaContentSignerBuilder(hashAlgorithm).build(keyPair.getPrivate()); return new JcaX509CertificateConverter() diff --git a/systemtest/src/main/java/org/bf2/systemtest/framework/TestPlanExecutionListener.java b/systemtest/src/main/java/org/bf2/systemtest/framework/TestPlanExecutionListener.java index 17aaaf857..70ef813bd 100644 --- a/systemtest/src/main/java/org/bf2/systemtest/framework/TestPlanExecutionListener.java +++ b/systemtest/src/main/java/org/bf2/systemtest/framework/TestPlanExecutionListener.java @@ -2,7 +2,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.bf2.systemtest.operator.FleetShardOperatorManager; +import org.bf2.systemtest.operator.KeycloakOperatorManager; +import org.bf2.systemtest.operator.StrimziOperatorManager; import org.bf2.test.Environment; +import org.bf2.test.k8s.KubeClient; import org.junit.platform.launcher.TestExecutionListener; import org.junit.platform.launcher.TestIdentifier; import org.junit.platform.launcher.TestPlan; @@ -10,17 +14,36 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public class TestPlanExecutionListener implements TestExecutionListener { + private static final Logger LOGGER = LogManager.getLogger(TestPlanExecutionListener.class); + private static final String DIVIDER = "======================================================================="; + + private StrimziOperatorManager strimziOperatorManager; + private KubeClient kube; + private boolean systemTestsSelected; + @Override public void testPlanExecutionStarted(TestPlan testPlan) { - LOGGER.info("======================================================================="); - LOGGER.info("======================================================================="); + LOGGER.info(DIVIDER); + LOGGER.info(DIVIDER); LOGGER.info(" Test run started"); - LOGGER.info("======================================================================="); - LOGGER.info("======================================================================="); - printSelectedTestClasses(testPlan); + LOGGER.info(DIVIDER); + LOGGER.info(DIVIDER); + + List testClasses = getSelectedTestClassNames(testPlan); + printSelectedTestClasses(testClasses); + systemTestsSelected = testClasses.isEmpty() || testClasses.stream().anyMatch(className -> className.endsWith("ST")); + + if (systemTestsSelected) { + deployComponents(); + } + try { Files.createDirectories(Environment.LOG_DIR); } catch (IOException e) { @@ -29,20 +52,65 @@ public void testPlanExecutionStarted(TestPlan testPlan) { } } + @Override public void testPlanExecutionFinished(TestPlan testPlan) { - LOGGER.info("======================================================================="); - LOGGER.info("======================================================================="); - LOGGER.info(" Test run finished"); - LOGGER.info("======================================================================="); - LOGGER.info("======================================================================="); + try { + if (systemTestsSelected) { + removeComponents(); + } + } finally { + LOGGER.info(DIVIDER); + LOGGER.info(DIVIDER); + LOGGER.info(" Test run finished"); + LOGGER.info(DIVIDER); + LOGGER.info(DIVIDER); + } } - private void printSelectedTestClasses(TestPlan plan) { - LOGGER.info("Following testclasses are selected for run:"); - Arrays.asList(plan.getChildren(plan.getRoots() + private List getSelectedTestClassNames(TestPlan plan) { + List selectedTests = Arrays.asList(plan.getChildren(plan.getRoots() .toArray(new TestIdentifier[0])[0]) - .toArray(new TestIdentifier[0])).forEach(testIdentifier -> LOGGER.info("-> {}", testIdentifier.getLegacyReportingName())); - LOGGER.info("======================================================================="); - LOGGER.info("======================================================================="); + .toArray(new TestIdentifier[0])); + + if (selectedTests.isEmpty()) { + return Collections.emptyList(); + } else { + return selectedTests.stream().map(TestIdentifier::getLegacyReportingName).collect(Collectors.toList()); + } + } + + private void printSelectedTestClasses(List testClasses) { + if (testClasses.isEmpty()) { + LOGGER.info("All test classes are selected for run"); + } else { + LOGGER.info("Following test classes are selected for run:"); + testClasses.forEach(testIdentifier -> LOGGER.info("-> {}", testIdentifier)); + } + + LOGGER.info(DIVIDER); + LOGGER.info(DIVIDER); + } + + private void deployComponents() { + strimziOperatorManager = new StrimziOperatorManager(SystemTestEnvironment.STRIMZI_VERSION); + kube = KubeClient.getInstance(); + + try { + CompletableFuture.allOf( + KeycloakOperatorManager.installKeycloak(kube), + strimziOperatorManager.installStrimzi(kube), + FleetShardOperatorManager.deployFleetShardOperator(kube), + FleetShardOperatorManager.deployFleetShardSync(kube)).join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void removeComponents() { + CompletableFuture.allOf( + KeycloakOperatorManager.uninstallKeycloak(kube), + FleetShardOperatorManager.deleteFleetShard(kube), + strimziOperatorManager.uninstallStrimziClusterWideResources(kube)) + .join(); } } diff --git a/systemtest/src/main/java/org/bf2/systemtest/operator/FleetShardOperatorManager.java b/systemtest/src/main/java/org/bf2/systemtest/operator/FleetShardOperatorManager.java index 6679f1909..4b945c403 100644 --- a/systemtest/src/main/java/org/bf2/systemtest/operator/FleetShardOperatorManager.java +++ b/systemtest/src/main/java/org/bf2/systemtest/operator/FleetShardOperatorManager.java @@ -92,7 +92,8 @@ public static CompletableFuture deployFleetShardOperator(KubeClient kubeCl LOGGER.info("Operator is deployed"); return TestUtils.asyncWaitFor("Operator ready", 1_000, INSTALL_TIMEOUT_MS, - () -> isOperatorInstalled(kubeClient) && agentStatusHasStrimziVersions(kubeClient)); + () -> isOperatorInstalled(kubeClient) && agentStatusHasStrimziVersions(kubeClient)) + .thenRun(() -> LOGGER.info("kas-fleetshard operator is ready")); } public static CompletableFuture deployFleetShardSync(KubeClient kubeClient) throws Exception { @@ -103,7 +104,8 @@ public static CompletableFuture deployFleetShardSync(KubeClient kubeClient LOGGER.info("Installing {}", SYNC_NAME); kubeClient.apply(OPERATOR_NS, SystemTestEnvironment.YAML_SYNC_BUNDLE_PATH); LOGGER.info("Sync is deployed"); - return TestUtils.asyncWaitFor("Sync ready", 1_000, INSTALL_TIMEOUT_MS, () -> isSyncInstalled(kubeClient)); + return TestUtils.asyncWaitFor("Sync ready", 1_000, INSTALL_TIMEOUT_MS, () -> isSyncInstalled(kubeClient)) + .thenRun(() -> LOGGER.info("kas-fleetshard sync is ready")); } static void deployPullSecrets(KubeClient kubeClient) throws Exception { diff --git a/systemtest/src/main/java/org/bf2/systemtest/operator/KeycloakOperatorManager.java b/systemtest/src/main/java/org/bf2/systemtest/operator/KeycloakOperatorManager.java index 1b679e4cf..432035c26 100644 --- a/systemtest/src/main/java/org/bf2/systemtest/operator/KeycloakOperatorManager.java +++ b/systemtest/src/main/java/org/bf2/systemtest/operator/KeycloakOperatorManager.java @@ -80,7 +80,8 @@ public static CompletableFuture installKeycloak(KubeClient kubeClient) thr return TestUtils.asyncWaitFor("Keycloak instance ready", 1_000, 600_000, () -> TestUtils.isPodReady(KubeClient.getInstance().client().pods().inNamespace(OPERATOR_NS) .list().getItems().stream().filter(pod -> - pod.getMetadata().getName().contains("keycloak-0")).findFirst().orElse(null))); + pod.getMetadata().getName().contains("keycloak-0")).findFirst().orElse(null))) + .thenRun(() -> LOGGER.info("Keycloak instance is ready")); } else { LOGGER.info("Keycloak is not installed suite will use values from env vars for oauth"); return CompletableFuture.completedFuture(null); diff --git a/systemtest/src/test/java/org/bf2/systemtest/integration/AbstractST.java b/systemtest/src/test/java/org/bf2/systemtest/integration/AbstractST.java index 376bcab0a..67c62648c 100644 --- a/systemtest/src/test/java/org/bf2/systemtest/integration/AbstractST.java +++ b/systemtest/src/test/java/org/bf2/systemtest/integration/AbstractST.java @@ -1,12 +1,20 @@ package org.bf2.systemtest.integration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.bf2.systemtest.api.sync.SyncApiClient; import org.bf2.systemtest.framework.ExtensionContextParameterResolver; import org.bf2.systemtest.framework.IndicativeSentences; +import org.bf2.systemtest.framework.KeycloakInstance; import org.bf2.systemtest.framework.ResourceManager; +import org.bf2.systemtest.framework.SystemTestEnvironment; import org.bf2.systemtest.framework.TestCallbackListener; import org.bf2.systemtest.framework.TestExceptionCallbackListener; +import org.bf2.systemtest.operator.FleetShardOperatorManager; +import org.bf2.systemtest.operator.KeycloakOperatorManager; import org.bf2.test.k8s.KubeClient; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -23,12 +31,26 @@ @DisplayNameGeneration(IndicativeSentences.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractST { + + private static final Logger LOGGER = LogManager.getLogger(AbstractST.class); + protected KubeClient kube; protected ResourceManager resourceManager; + protected String syncEndpoint; + protected KeycloakInstance keycloak; @BeforeAll void init() { kube = KubeClient.getInstance(); resourceManager = ResourceManager.getInstance(); + syncEndpoint = FleetShardOperatorManager.createEndpoint(kube); + LOGGER.info("Endpoint address {}", syncEndpoint); + keycloak = SystemTestEnvironment.INSTALL_KEYCLOAK ? new KeycloakInstance(KeycloakOperatorManager.OPERATOR_NS) : null; + } + + @BeforeEach + void setup() throws Exception { + // Remove any Kafkas left by previous tests + SyncApiClient.deleteManagedKafkas(syncEndpoint); } } diff --git a/systemtest/src/test/java/org/bf2/systemtest/integration/ManagedKafkaST.java b/systemtest/src/test/java/org/bf2/systemtest/integration/ManagedKafkaST.java index 819f8cb4b..7ee404ba2 100644 --- a/systemtest/src/test/java/org/bf2/systemtest/integration/ManagedKafkaST.java +++ b/systemtest/src/test/java/org/bf2/systemtest/integration/ManagedKafkaST.java @@ -10,21 +10,14 @@ import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatus; import org.bf2.systemtest.api.sync.SyncApiClient; import org.bf2.systemtest.framework.AssertUtils; -import org.bf2.systemtest.framework.KeycloakInstance; import org.bf2.systemtest.framework.SequentialTest; -import org.bf2.systemtest.framework.SystemTestEnvironment; import org.bf2.systemtest.framework.resource.ManagedKafkaResourceType; -import org.bf2.systemtest.operator.FleetShardOperatorManager; -import org.bf2.systemtest.operator.KeycloakOperatorManager; -import org.bf2.systemtest.operator.StrimziOperatorManager; import org.bf2.test.TestUtils; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.extension.ExtensionContext; import java.net.HttpURLConnection; import java.net.http.HttpResponse; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -33,33 +26,13 @@ public class ManagedKafkaST extends AbstractST { private static final Logger LOGGER = LogManager.getLogger(ManagedKafkaST.class); - private String syncEndpoint; - private final StrimziOperatorManager strimziOperatorManager = new StrimziOperatorManager(SystemTestEnvironment.STRIMZI_VERSION); - private KeycloakInstance keycloak; private String latestStrimziVersion; private String latestKafkaVersion; @BeforeAll void deploy() throws Exception { - CompletableFuture.allOf( - KeycloakOperatorManager.installKeycloak(kube), - strimziOperatorManager.installStrimzi(kube), - FleetShardOperatorManager.deployFleetShardOperator(kube), - FleetShardOperatorManager.deployFleetShardSync(kube)).join(); - - keycloak = SystemTestEnvironment.INSTALL_KEYCLOAK ? new KeycloakInstance(KeycloakOperatorManager.OPERATOR_NS) : null; - syncEndpoint = FleetShardOperatorManager.createEndpoint(kube); latestStrimziVersion = SyncApiClient.getLatestStrimziVersion(syncEndpoint); latestKafkaVersion = SyncApiClient.getLatestKafkaVersion(syncEndpoint, latestStrimziVersion); - LOGGER.info("Endpoint address {}", syncEndpoint); - } - - @AfterAll - void clean() { - CompletableFuture.allOf( - KeycloakOperatorManager.uninstallKeycloak(kube), - FleetShardOperatorManager.deleteFleetShard(kube), - strimziOperatorManager.uninstallStrimziClusterWideResources(kube)).join(); } @SequentialTest diff --git a/systemtest/src/test/java/org/bf2/systemtest/integration/SmokeST.java b/systemtest/src/test/java/org/bf2/systemtest/integration/SmokeST.java index a01b03d74..35df66907 100644 --- a/systemtest/src/test/java/org/bf2/systemtest/integration/SmokeST.java +++ b/systemtest/src/test/java/org/bf2/systemtest/integration/SmokeST.java @@ -10,16 +10,10 @@ import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatus; import org.bf2.systemtest.api.sync.SyncApiClient; import org.bf2.systemtest.framework.AssertUtils; -import org.bf2.systemtest.framework.KeycloakInstance; import org.bf2.systemtest.framework.SequentialTest; -import org.bf2.systemtest.framework.SystemTestEnvironment; import org.bf2.systemtest.framework.TestTags; import org.bf2.systemtest.framework.resource.ManagedKafkaResourceType; -import org.bf2.systemtest.operator.FleetShardOperatorManager; -import org.bf2.systemtest.operator.KeycloakOperatorManager; -import org.bf2.systemtest.operator.StrimziOperatorManager; import org.bf2.test.TestUtils; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtensionContext; @@ -27,56 +21,20 @@ import java.net.HttpURLConnection; import java.net.http.HttpResponse; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; @Tag(TestTags.SMOKE) public class SmokeST extends AbstractST { private static final Logger LOGGER = LogManager.getLogger(SmokeST.class); - private String syncEndpoint; - private final StrimziOperatorManager strimziOperatorManager = new StrimziOperatorManager(SystemTestEnvironment.STRIMZI_VERSION); - private KeycloakInstance keycloak; + private String latestStrimziVersion; private String latestKafkaVersion; @BeforeAll void deploy() throws Exception { - CompletableFuture.allOf( - KeycloakOperatorManager.installKeycloak(kube), - strimziOperatorManager.installStrimzi(kube), - FleetShardOperatorManager.deployFleetShardOperator(kube), - FleetShardOperatorManager.deployFleetShardSync(kube)).join(); - - keycloak = SystemTestEnvironment.INSTALL_KEYCLOAK ? new KeycloakInstance(KeycloakOperatorManager.OPERATOR_NS) : null; - syncEndpoint = FleetShardOperatorManager.createEndpoint(kube); latestStrimziVersion = SyncApiClient.getLatestStrimziVersion(syncEndpoint); latestKafkaVersion = SyncApiClient.getLatestKafkaVersion(syncEndpoint, latestStrimziVersion); - LOGGER.info("Endpoint address {}", syncEndpoint); - } - - @AfterAll - void clean() { - var fleetshardFuture = FleetShardOperatorManager.deleteFleetShard(kube); - var keycloakFuture = KeycloakOperatorManager.uninstallKeycloak(kube); - var strimziFuture = strimziOperatorManager.uninstallStrimziClusterWideResources(kube); - - fleetshardFuture.join(); - - checkUninstall(keycloakFuture, "Keycloak"); - checkUninstall(strimziFuture, "Strimzi"); - } - - private void checkUninstall(CompletableFuture pendingUninstall, String name) { - if (!pendingUninstall.isDone()) { - LOGGER.warn("{} did not finish uninstalling", name); - return; - } - try { - pendingUninstall.getNow(null); - } catch (Exception e) { - LOGGER.error("Error uninstalling", e); - } } @SequentialTest diff --git a/systemtest/src/test/java/org/bf2/systemtest/integration/UpgradeST.java b/systemtest/src/test/java/org/bf2/systemtest/integration/UpgradeST.java index 28da95e63..1dd4b84e6 100644 --- a/systemtest/src/test/java/org/bf2/systemtest/integration/UpgradeST.java +++ b/systemtest/src/test/java/org/bf2/systemtest/integration/UpgradeST.java @@ -1,19 +1,22 @@ package org.bf2.systemtest.integration; import io.fabric8.kubernetes.api.model.NamespaceBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.utils.Serialization; +import io.strimzi.api.kafka.model.Kafka; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.bf2.operator.ManagedKafkaKeys; import org.bf2.operator.resources.v1alpha1.ManagedKafka; +import org.bf2.operator.resources.v1alpha1.ManagedKafkaAgentStatus; import org.bf2.operator.resources.v1alpha1.ManagedKafkaCondition; +import org.bf2.operator.resources.v1alpha1.ManagedKafkaStatus; import org.bf2.systemtest.api.sync.SyncApiClient; import org.bf2.systemtest.framework.AssertUtils; -import org.bf2.systemtest.framework.KeycloakInstance; import org.bf2.systemtest.framework.SequentialTest; import org.bf2.systemtest.framework.SystemTestEnvironment; import org.bf2.systemtest.framework.TestTags; import org.bf2.systemtest.framework.resource.ManagedKafkaResourceType; -import org.bf2.systemtest.operator.FleetShardOperatorManager; -import org.bf2.systemtest.operator.KeycloakOperatorManager; import org.bf2.systemtest.operator.StrimziOperatorManager; import org.bf2.test.TestUtils; import org.junit.jupiter.api.AfterAll; @@ -23,8 +26,10 @@ import java.net.HttpURLConnection; import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -32,37 +37,21 @@ @Tag(TestTags.UPGRADE) public class UpgradeST extends AbstractST { private static final Logger LOGGER = LogManager.getLogger(UpgradeST.class); - private String syncEndpoint; private StrimziOperatorManager strimziOperatorManagerOld; - private StrimziOperatorManager strimziOperatorManagerNew; - private KeycloakInstance keycloak; private String latestStrimziVersion; @BeforeAll void deploy() throws Exception { strimziOperatorManagerOld = new StrimziOperatorManager( StrimziOperatorManager.getPreviousUpstreamStrimziVersion(SystemTestEnvironment.STRIMZI_VERSION)); - strimziOperatorManagerNew = new StrimziOperatorManager(SystemTestEnvironment.STRIMZI_VERSION); - CompletableFuture.allOf( - KeycloakOperatorManager.installKeycloak(kube), - strimziOperatorManagerNew.installStrimzi(kube), - strimziOperatorManagerOld.installStrimzi(kube), - FleetShardOperatorManager.deployFleetShardOperator(kube), - FleetShardOperatorManager.deployFleetShardSync(kube)).join(); - - keycloak = SystemTestEnvironment.INSTALL_KEYCLOAK ? new KeycloakInstance(KeycloakOperatorManager.OPERATOR_NS) : null; - syncEndpoint = FleetShardOperatorManager.createEndpoint(kube); + + strimziOperatorManagerOld.installStrimzi(kube).join(); latestStrimziVersion = SyncApiClient.getLatestStrimziVersion(syncEndpoint); - LOGGER.info("Endpoint address {}", syncEndpoint); } @AfterAll void clean() { - CompletableFuture.allOf( - KeycloakOperatorManager.uninstallKeycloak(kube), - FleetShardOperatorManager.deleteFleetShard(kube), - strimziOperatorManagerOld.uninstallStrimziClusterWideResources(kube), - strimziOperatorManagerNew.uninstallStrimziClusterWideResources(kube)).join(); + strimziOperatorManagerOld.uninstallStrimziClusterWideResources(kube).join(); } @SequentialTest @@ -111,13 +100,164 @@ void testUpgradeStrimziVersion(ExtensionContext extensionContext) throws Excepti } TestUtils.waitFor("MK is upgraded", TimeUnit.SECONDS.toMillis(20), TimeUnit.MINUTES.toMillis(10), () -> { + return Objects.equals(latestStrimziVersion, actualStrimziVersion(mkAppName)); + }); + + //delete mk using api + res = SyncApiClient.deleteManagedKafka(mk.getId(), syncEndpoint); + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, res.statusCode()); + + ManagedKafkaResourceType.isDeleted(mk); + + LOGGER.info("ManagedKafka {} deleted", mkAppName); + } + + @SequentialTest + void testUpgradeStrimziVersionWhileSuspended(ExtensionContext extensionContext) throws Exception { + String mkAppName = "mk-test-suspended-upgrade"; + String startVersion = SyncApiClient.getPreviousStrimziVersion(syncEndpoint); + String kafkaVersion = SyncApiClient.getLatestKafkaVersion(syncEndpoint, startVersion); + + ManagedKafka mk = ManagedKafkaResourceType.getDefault(mkAppName, mkAppName, keycloak, startVersion, kafkaVersion); + String id = mk.getId(); + + //Create mk using api + resourceManager.addResource(extensionContext, new NamespaceBuilder().withNewMetadata().withName(mkAppName).endMetadata().build()); + resourceManager.addResource(extensionContext, mk); + + HttpResponse res = SyncApiClient.createManagedKafka(mk, syncEndpoint); + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, res.statusCode()); + + resourceManager.waitResourceCondition(mk, Objects::nonNull); + mk = resourceManager.waitUntilReady(mk, 300_000); + + LOGGER.info("ManagedKafka {} created", mkAppName); + + // wait for the sync to be up-to-date + TestUtils.waitFor("Managed kafka status sync", 1_000, 30_000, () -> { try { - assertEquals(latestStrimziVersion, ManagedKafkaResourceType.getOperation().inNamespace(mkAppName) - .withName(mkAppName).get().getStatus().getVersions().getStrimzi()); - return true; - } catch (AssertionError err) { - return false; + String statusBody = SyncApiClient.getManagedKafkaStatus(id, syncEndpoint).body(); + if (statusBody.isEmpty()) { + return false; + } + ManagedKafkaStatus apiStatus = Serialization.jsonMapper().readValue(statusBody, ManagedKafkaStatus.class); + return ManagedKafkaResourceType.hasConditionStatus(apiStatus, ManagedKafkaCondition.Type.Ready, + ManagedKafkaCondition.Status.True); + } catch (Exception e) { + throw new AssertionError(e); } }); + + LOGGER.info("Suspending ManagedKafka"); + mk = ManagedKafkaResourceType.getDefault(mkAppName, mkAppName, keycloak, startVersion, kafkaVersion); + mk.setId(id); + mk.getMetadata().setLabels(Map.of(ManagedKafka.SUSPENDED_INSTANCE, "true")); + resourceManager.addResource(extensionContext, mk); + res = SyncApiClient.createManagedKafka(mk, syncEndpoint); + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, res.statusCode()); + + TestUtils.waitFor("ManagedKafka suspended", 1_000, Duration.ofMinutes(5).toMillis(), () -> managedKafkaSuspended(mkAppName)); + + LOGGER.info("Upgrade to {}", latestStrimziVersion); + mk = ManagedKafkaResourceType.getDefault(mkAppName, mkAppName, keycloak, latestStrimziVersion, kafkaVersion); + mk.setId(id); + mk.getMetadata().setLabels(Map.of(ManagedKafka.SUSPENDED_INSTANCE, "true")); + resourceManager.addResource(extensionContext, mk); + res = SyncApiClient.createManagedKafka(mk, syncEndpoint); + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, res.statusCode()); + TestUtils.waitFor("ManagedKafka upgraded", 1_000, Duration.ofMinutes(10).toMillis(), () -> + Objects.equals(latestStrimziVersion, actualStrimziVersion(mkAppName)) + && managedKafkaSuspended(mkAppName)); + + LOGGER.info("Resuming ManagedKafka from suspended state"); + mk.getMetadata().setLabels(Map.of(ManagedKafka.SUSPENDED_INSTANCE, "false")); + res = SyncApiClient.createManagedKafka(mk, syncEndpoint); + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, res.statusCode()); + mk = resourceManager.waitUntilReady(mk, 300_000); + + assertEquals(latestStrimziVersion, ManagedKafkaResourceType.getOperation().inNamespace(mkAppName) + .withName(mkAppName).get().getStatus().getVersions().getStrimzi()); + + // wait for the sync to be up-to-date + TestUtils.waitFor("Managed kafka status sync", 1_000, 30_000, () -> { + try { + String statusBody = SyncApiClient.getManagedKafkaStatus(id, syncEndpoint).body(); + if (statusBody.isEmpty()) { + return false; + } + ManagedKafkaStatus apiStatus = Serialization.jsonMapper().readValue(statusBody, ManagedKafkaStatus.class); + return ManagedKafkaResourceType.hasConditionStatus(apiStatus, ManagedKafkaCondition.Type.Ready, + ManagedKafkaCondition.Status.True); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + + //Get status and compare with CR status + ManagedKafkaStatus apiStatus = Serialization.jsonMapper() + .readValue(SyncApiClient.getManagedKafkaStatus(mk.getId(), syncEndpoint).body(), ManagedKafkaStatus.class); + ManagedKafka managedKafka = ManagedKafkaResourceType.getOperation().inNamespace(mkAppName).withName(mkAppName).get(); + + AssertUtils.assertManagedKafkaStatus(managedKafka, apiStatus); + + //Get agent status + ManagedKafkaAgentStatus managedKafkaAgentStatus = Serialization.jsonMapper() + .readValue(SyncApiClient.getManagedKafkaAgentStatus(syncEndpoint).body(), ManagedKafkaAgentStatus.class); + + AssertUtils.assertManagedKafkaAgentStatus(managedKafkaAgentStatus); + + //Check if managed kafka deployed all components + AssertUtils.assertManagedKafka(mk); + + //delete mk using api + res = SyncApiClient.deleteManagedKafka(mk.getId(), syncEndpoint); + assertEquals(HttpURLConnection.HTTP_NO_CONTENT, res.statusCode()); + + ManagedKafkaResourceType.isDeleted(mk); + + LOGGER.info("ManagedKafka {} deleted", mkAppName); + } + + String actualStrimziVersion(String mkAppName) { + return ManagedKafkaResourceType.getOperation() + .inNamespace(mkAppName) + .withName(mkAppName) + .get() + .getStatus() + .getVersions() + .getStrimzi(); + } + + boolean managedKafkaSuspended(String mkAppName) { + ManagedKafka remote = ManagedKafkaResourceType.getOperation() + .inNamespace(mkAppName) + .withName(mkAppName) + .get(); + + var readyCondition = ManagedKafkaResourceType.getCondition(remote.getStatus(), ManagedKafkaCondition.Type.Ready); + + if (readyCondition.map(c -> !"False".equals(c.getStatus()) || !"Suspended".equals(c.getReason())).orElse(true)) { + return false; + } + + List pods = kube.client().pods().inNamespace(mkAppName).list().getItems(); + + if (!pods.isEmpty()) { + return false; + } + + String pauseReason = kube.client().resources(Kafka.class) + .inNamespace(mkAppName) + .withName(mkAppName) + .get() + .getMetadata() + .getAnnotations() + .get(ManagedKafkaKeys.Annotations.STRIMZI_PAUSE_REASON); + + if (pauseReason != null && !pauseReason.isBlank()) { + return false; + } + + return true; } } diff --git a/systemtest/src/test/java/org/bf2/systemtest/unit/SuiteUnitTest.java b/systemtest/src/test/java/org/bf2/systemtest/unit/SuiteUnitTest.java index a4e66b4a2..ad882f0f7 100644 --- a/systemtest/src/test/java/org/bf2/systemtest/unit/SuiteUnitTest.java +++ b/systemtest/src/test/java/org/bf2/systemtest/unit/SuiteUnitTest.java @@ -14,16 +14,23 @@ import io.quarkus.test.kubernetes.client.KubernetesTestServer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.bf2.systemtest.framework.ExtensionContextParameterResolver; +import org.bf2.systemtest.framework.IndicativeSentences; import org.bf2.systemtest.framework.KeycloakInstance; import org.bf2.systemtest.framework.ParallelTest; import org.bf2.systemtest.framework.SecurityUtils; import org.bf2.systemtest.framework.SequentialTest; -import org.bf2.systemtest.integration.AbstractST; +import org.bf2.systemtest.framework.TestCallbackListener; +import org.bf2.systemtest.framework.TestExceptionCallbackListener; import org.bf2.test.executor.Exec; import org.bf2.test.executor.ExecBuilder; import org.bf2.test.executor.ExecResult; +import org.bf2.test.k8s.KubeClient; import org.bf2.test.k8s.KubeClusterException; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; import java.nio.charset.StandardCharsets; import java.util.Base64; @@ -42,9 +49,16 @@ */ @QuarkusTest @QuarkusTestResource(KubernetesServerTestResource.class) -public class SuiteUnitTest extends AbstractST { +@ExtendWith(TestCallbackListener.class) +@ExtendWith(ExtensionContextParameterResolver.class) +@ExtendWith(TestExceptionCallbackListener.class) +@DisplayNameGeneration(IndicativeSentences.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SuiteUnitTest { + private static final Logger LOGGER = LogManager.getLogger(SuiteUnitTest.class); private static final String TEST_NS = "default"; + private final KubeClient kube = KubeClient.getInstance(); @KubernetesTestServer KubernetesServer mockServer; diff --git a/test/src/main/java/org/bf2/test/TestUtils.java b/test/src/main/java/org/bf2/test/TestUtils.java index ddf84762a..c7908befa 100644 --- a/test/src/main/java/org/bf2/test/TestUtils.java +++ b/test/src/main/java/org/bf2/test/TestUtils.java @@ -3,6 +3,7 @@ import io.fabric8.kubernetes.api.model.ContainerStatus; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.internal.readiness.Readiness; @@ -165,16 +166,29 @@ public static Function replacer(final Map pod.getMetadata().getName().contains("kube-apiserver-")) + .forEach(pod -> client.resource(pod).withGracePeriod(1000).delete()); + } catch (KubernetesClientException kce) { + // Expected + LOGGER.info("KubernetesClientException: {}", kce.getMessage()); } - KubeClient.getInstance().client().pods().inNamespace(apiNamespace).list().getItems().stream().filter(pod -> - pod.getMetadata().getName().contains("kube-apiserver-")).forEach(pod -> - KubeClient.getInstance().client().pods().inNamespace(apiNamespace).withName(pod.getMetadata().getName()).withGracePeriod(1000).delete()); } } }