Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[MGDSTRM-9790] Test instance upgrade while suspended #854

Merged
merged 1 commit into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ private Kafka upgrade(ManagedKafka managedKafka, KafkaBuilder kafkaBuilder) {
* @return true if suspension should be blocked, otherwise false
*/
private boolean blockSuspension(ManagedKafka managedKafka) {
if (isStrimziUpdating(managedKafka)) {
return true;
}

if (kafkaManager.hasKafkaVersionChanged(managedKafka)) {
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions operator/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ quarkus.kubernetes.labels.app=kas-fleetshard-operator
# deactivate CRD checking from Java Operator SDK
quarkus.operator-sdk.crd.validate=false
quarkus.operator-sdk.disable-rbac-generation=true
# increasing retries from the default max-attempts of 5
quarkus.operator-sdk.controllers."managedkafkacontroller".retry.max-attempts=20

quarkus.arc.test.disable-application-lifecycle-observers=true

Expand Down
2 changes: 1 addition & 1 deletion sync/src/main/java/org/bf2/sync/ManagedKafkaAgentSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ManagedKafkaAgentSync {

@Timed(value = "sync.poll", extraTags = {"resource", "ManagedKafkaAgent"}, description = "The time spent processing polling calls")
@Counted(value = "sync.poll", extraTags = {"resource", "ManagedKafkaAgent"}, description = "The number of polling calls")
@Scheduled(every = "{poll.interval}", concurrentExecution = ConcurrentExecution.SKIP)
@Scheduled(every = "{poll.interval}", delayed = "{poll.delay}", concurrentExecution = ConcurrentExecution.SKIP)
void loop() {
ManagedKafkaAgent managedKafkaAgent = controlPlane.getManagedKafkaAgent();
Objects.requireNonNull(managedKafkaAgent);
Expand Down
2 changes: 1 addition & 1 deletion sync/src/main/java/org/bf2/sync/ManagedKafkaSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ void checkCreationTimestamp(ManagedKafka existing, Duration duration) {
}
}

@Scheduled(every = "{poll.interval}", concurrentExecution = ConcurrentExecution.SKIP)
@Scheduled(every = "{poll.interval}", delayed = "{poll.delay}", concurrentExecution = ConcurrentExecution.SKIP)
void pollKafkaClusters() {
log.debug("Polling for control plane managed kafkas");
// TODO: this is based upon a full poll - eventually this could be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ public void deleteCluster(@PathParam("clusterid") String clusterId) {
markForDeletion(clusterId);
}

@DELETE
@Path("/{id}/kafkas")
@Produces(MediaType.APPLICATION_JSON)
public void deleteAllClusters() {
log.infof("control plane:: received request to delete all ManagedKafkas");
this.kafkas.keySet().forEach(this::markForDeletion);
}

@PUT
@Path("/{id}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected void onStart() {
CustomResourceEventHandler.of(controlPlane::updateKafkaClusterStatus));

// for the Agent
managedAgentInformer = resourceInformerFactory.create(ManagedKafkaAgent.class, client.resources(ManagedKafkaAgent.class).inAnyNamespace(),
managedAgentInformer = resourceInformerFactory.create(ManagedKafkaAgent.class, client.resources(ManagedKafkaAgent.class).inNamespace(client.getNamespace()),
CustomResourceEventHandler.of(controlPlane::updateAgentStatus));

secretInformer = resourceInformerFactory.create(Secret.class, client.secrets().inAnyNamespace().withLabels(OperandUtils.getMasterSecretLabel()),
Expand Down
2 changes: 2 additions & 0 deletions sync/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ secret.name=addon-kas-fleetshard-operator-parameters
sso.enabled=false
sso.filter.enabled=true
secret.enabled=true
poll.delay=${poll-delay:0s}
quarkus.log.console.format=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) %x %s%e%n
quarkus.kubernetes.ports.http.host-port=8080

Expand All @@ -28,6 +29,7 @@ quarkus.kubernetes.ports.http.host-port=8080
%test.secret.enabled=false
%test.sync.mock-control-plane.simulate=false
%test.quarkus.log.category."org.bf2".level=DEBUG
%test.poll.delay=15s
%test.poll.interval=5s

# control plane properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -59,6 +60,19 @@ public static HttpResponse<String> deleteManagedKafka(String id, String endpoint
return retry(() -> client.send(request, HttpResponse.BodyHandlers.ofString()));
}

public static HttpResponse<String> deleteManagedKafkas(String endpoint) throws Exception {
LOGGER.info("Deleting all managed kafkas");
HttpClient client = HttpClient.newHttpClient();
URI uri = URI.create(endpoint + BASE_PATH + "pepa/kafkas");
HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.DELETE()
.timeout(Duration.ofMinutes(2))
.build();
LOGGER.info("Sending DELETE request to {} with port {} and path {}", uri.getHost(), uri.getPort(), uri.getPath());
return retry(() -> client.send(request, HttpResponse.BodyHandlers.ofString()));
}

public static HttpResponse<String> getManagedKafkaAgentStatus(String endpoint) throws Exception {
LOGGER.info("Get managed kafka agent status");
return getRequest("pepa/status", endpoint);
Expand Down Expand Up @@ -116,26 +130,41 @@ private static Stream<String> getSortedAvailableStrimziVersions(String endpoint)
}

public static Stream<String> getSortedAvailableStrimziVersions(Supplier<ManagedKafkaAgentStatus> statusSupplier) {
AtomicReference<ManagedKafkaAgentStatus> agentStatus = new AtomicReference<>();

TestUtils.waitFor("Strimzi version is reported", 1_000, 60_000, () -> {
try {
return statusSupplier.get().getStrimzi().size() > 0;
ManagedKafkaAgentStatus status = statusSupplier.get();
agentStatus.set(status);
return !status.getStrimzi().isEmpty();
} catch (Exception e) {
return false;
}
});

return sortedStrimziVersion(statusSupplier.get().getStrimzi().stream().map(StrimziVersionStatus::getVersion));
return sortedStrimziVersion(agentStatus.get().getStrimzi().stream().map(StrimziVersionStatus::getVersion));
}

public static Stream<String> getKafkaVersions(Supplier<ManagedKafkaAgentStatus> statusSupplier, String strimziVersion) {
AtomicReference<ManagedKafkaAgentStatus> agentStatus = new AtomicReference<>();

TestUtils.waitFor("Strimzi version is reported", 1_000, 60_000, () -> {
try {
return statusSupplier.get().getStrimzi().size() > 0;
ManagedKafkaAgentStatus status = statusSupplier.get();
agentStatus.set(status);
return !status.getStrimzi().isEmpty();
} catch (Exception e) {
return false;
}
});
return statusSupplier.get().getStrimzi().stream().filter(item -> item.getVersion().equals(strimziVersion)).findFirst().get().getKafkaVersions().stream();

return agentStatus.get()
.getStrimzi()
.stream()
.filter(item -> item.getVersion().equals(strimziVersion))
.findFirst()
.map(selectedVersion -> selectedVersion.getKafkaVersions().stream())
.orElseGet(Stream::empty);
}

public static String getLatestAvailableKafkaVersion(Supplier<ManagedKafkaAgentStatus> statusSupplier, String strimziVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier;
import org.bouncycastle.asn1.x509.BasicConstraints;
import org.bouncycastle.asn1.x509.Extension;
import org.bouncycastle.asn1.x509.GeneralName;
import org.bouncycastle.asn1.x509.GeneralNames;
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
import org.bouncycastle.cert.CertIOException;
Expand Down Expand Up @@ -126,6 +128,10 @@ private static X509Certificate generate(final KeyPair keyPair,
certificateBuilder.addExtension(Extension.authorityKeyIdentifier, false, authority);
certificateBuilder.addExtension(Extension.basicConstraints, true, new BasicConstraints(true));

// Add SAN (required by some clients)
GeneralNames subjectAltName = new GeneralNames(new GeneralName(GeneralName.dNSName, "*." + domain));
certificateBuilder.addExtension(Extension.subjectAlternativeName, false, subjectAltName);

final ContentSigner contentSigner = new JcaContentSignerBuilder(hashAlgorithm).build(keyPair.getPrivate());

return new JcaX509CertificateConverter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,48 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bf2.systemtest.operator.FleetShardOperatorManager;
import org.bf2.systemtest.operator.KeycloakOperatorManager;
import org.bf2.systemtest.operator.StrimziOperatorManager;
import org.bf2.test.Environment;
import org.bf2.test.k8s.KubeClient;
import org.junit.platform.launcher.TestExecutionListener;
import org.junit.platform.launcher.TestIdentifier;
import org.junit.platform.launcher.TestPlan;

import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class TestPlanExecutionListener implements TestExecutionListener {

private static final Logger LOGGER = LogManager.getLogger(TestPlanExecutionListener.class);
private static final String DIVIDER = "=======================================================================";

private StrimziOperatorManager strimziOperatorManager;
private KubeClient kube;
private boolean systemTestsSelected;

@Override
public void testPlanExecutionStarted(TestPlan testPlan) {
LOGGER.info("=======================================================================");
LOGGER.info("=======================================================================");
LOGGER.info(DIVIDER);
LOGGER.info(DIVIDER);
LOGGER.info(" Test run started");
LOGGER.info("=======================================================================");
LOGGER.info("=======================================================================");
printSelectedTestClasses(testPlan);
LOGGER.info(DIVIDER);
LOGGER.info(DIVIDER);

List<String> testClasses = getSelectedTestClassNames(testPlan);
printSelectedTestClasses(testClasses);
systemTestsSelected = testClasses.isEmpty() || testClasses.stream().anyMatch(className -> className.endsWith("ST"));

if (systemTestsSelected) {
deployComponents();
}

try {
Files.createDirectories(Environment.LOG_DIR);
} catch (IOException e) {
Expand All @@ -29,20 +52,65 @@ public void testPlanExecutionStarted(TestPlan testPlan) {
}
}

@Override
public void testPlanExecutionFinished(TestPlan testPlan) {
LOGGER.info("=======================================================================");
LOGGER.info("=======================================================================");
LOGGER.info(" Test run finished");
LOGGER.info("=======================================================================");
LOGGER.info("=======================================================================");
try {
if (systemTestsSelected) {
removeComponents();
}
} finally {
LOGGER.info(DIVIDER);
LOGGER.info(DIVIDER);
LOGGER.info(" Test run finished");
LOGGER.info(DIVIDER);
LOGGER.info(DIVIDER);
}
}

private void printSelectedTestClasses(TestPlan plan) {
LOGGER.info("Following testclasses are selected for run:");
Arrays.asList(plan.getChildren(plan.getRoots()
private List<String> getSelectedTestClassNames(TestPlan plan) {
List<TestIdentifier> selectedTests = Arrays.asList(plan.getChildren(plan.getRoots()
.toArray(new TestIdentifier[0])[0])
.toArray(new TestIdentifier[0])).forEach(testIdentifier -> LOGGER.info("-> {}", testIdentifier.getLegacyReportingName()));
LOGGER.info("=======================================================================");
LOGGER.info("=======================================================================");
.toArray(new TestIdentifier[0]));

if (selectedTests.isEmpty()) {
return Collections.emptyList();
} else {
return selectedTests.stream().map(TestIdentifier::getLegacyReportingName).collect(Collectors.toList());
}
}

private void printSelectedTestClasses(List<String> testClasses) {
if (testClasses.isEmpty()) {
LOGGER.info("All test classes are selected for run");
} else {
LOGGER.info("Following test classes are selected for run:");
testClasses.forEach(testIdentifier -> LOGGER.info("-> {}", testIdentifier));
}

LOGGER.info(DIVIDER);
LOGGER.info(DIVIDER);
}

private void deployComponents() {
strimziOperatorManager = new StrimziOperatorManager(SystemTestEnvironment.STRIMZI_VERSION);
kube = KubeClient.getInstance();

try {
CompletableFuture.allOf(
KeycloakOperatorManager.installKeycloak(kube),
strimziOperatorManager.installStrimzi(kube),
FleetShardOperatorManager.deployFleetShardOperator(kube),
FleetShardOperatorManager.deployFleetShardSync(kube)).join();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void removeComponents() {
CompletableFuture.allOf(
KeycloakOperatorManager.uninstallKeycloak(kube),
FleetShardOperatorManager.deleteFleetShard(kube),
strimziOperatorManager.uninstallStrimziClusterWideResources(kube))
.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public static CompletableFuture<Void> deployFleetShardOperator(KubeClient kubeCl

LOGGER.info("Operator is deployed");
return TestUtils.asyncWaitFor("Operator ready", 1_000, INSTALL_TIMEOUT_MS,
() -> isOperatorInstalled(kubeClient) && agentStatusHasStrimziVersions(kubeClient));
() -> isOperatorInstalled(kubeClient) && agentStatusHasStrimziVersions(kubeClient))
.thenRun(() -> LOGGER.info("kas-fleetshard operator is ready"));
}

public static CompletableFuture<Void> deployFleetShardSync(KubeClient kubeClient) throws Exception {
Expand All @@ -103,7 +104,8 @@ public static CompletableFuture<Void> deployFleetShardSync(KubeClient kubeClient
LOGGER.info("Installing {}", SYNC_NAME);
kubeClient.apply(OPERATOR_NS, SystemTestEnvironment.YAML_SYNC_BUNDLE_PATH);
LOGGER.info("Sync is deployed");
return TestUtils.asyncWaitFor("Sync ready", 1_000, INSTALL_TIMEOUT_MS, () -> isSyncInstalled(kubeClient));
return TestUtils.asyncWaitFor("Sync ready", 1_000, INSTALL_TIMEOUT_MS, () -> isSyncInstalled(kubeClient))
.thenRun(() -> LOGGER.info("kas-fleetshard sync is ready"));
}

static void deployPullSecrets(KubeClient kubeClient) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static CompletableFuture<Void> installKeycloak(KubeClient kubeClient) thr
return TestUtils.asyncWaitFor("Keycloak instance ready", 1_000, 600_000, () ->
TestUtils.isPodReady(KubeClient.getInstance().client().pods().inNamespace(OPERATOR_NS)
.list().getItems().stream().filter(pod ->
pod.getMetadata().getName().contains("keycloak-0")).findFirst().orElse(null)));
pod.getMetadata().getName().contains("keycloak-0")).findFirst().orElse(null)))
.thenRun(() -> LOGGER.info("Keycloak instance is ready"));
} else {
LOGGER.info("Keycloak is not installed suite will use values from env vars for oauth");
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package org.bf2.systemtest.integration;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bf2.systemtest.api.sync.SyncApiClient;
import org.bf2.systemtest.framework.ExtensionContextParameterResolver;
import org.bf2.systemtest.framework.IndicativeSentences;
import org.bf2.systemtest.framework.KeycloakInstance;
import org.bf2.systemtest.framework.ResourceManager;
import org.bf2.systemtest.framework.SystemTestEnvironment;
import org.bf2.systemtest.framework.TestCallbackListener;
import org.bf2.systemtest.framework.TestExceptionCallbackListener;
import org.bf2.systemtest.operator.FleetShardOperatorManager;
import org.bf2.systemtest.operator.KeycloakOperatorManager;
import org.bf2.test.k8s.KubeClient;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayNameGeneration;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -23,12 +31,26 @@
@DisplayNameGeneration(IndicativeSentences.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractST {

private static final Logger LOGGER = LogManager.getLogger(AbstractST.class);

protected KubeClient kube;
protected ResourceManager resourceManager;
protected String syncEndpoint;
protected KeycloakInstance keycloak;

@BeforeAll
void init() {
kube = KubeClient.getInstance();
resourceManager = ResourceManager.getInstance();
syncEndpoint = FleetShardOperatorManager.createEndpoint(kube);
LOGGER.info("Endpoint address {}", syncEndpoint);
keycloak = SystemTestEnvironment.INSTALL_KEYCLOAK ? new KeycloakInstance(KeycloakOperatorManager.OPERATOR_NS) : null;
}

@BeforeEach
void setup() throws Exception {
// Remove any Kafkas left by previous tests
SyncApiClient.deleteManagedKafkas(syncEndpoint);
}
}
Loading