Skip to content

Commit

Permalink
MGDSTRM-10985 accounting for the possibility of competing operators (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored Mar 24, 2023
1 parent 5bc8841 commit 0aa6d8f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ synchronized Optional<OperandReadiness> claimResources(ManagedKafka managedKafka
// ensure we have the latest
ConfigMap resourceConfigMap = client.configMaps().withName(FLEETSHARD_RESOURCES).get();

String managedKafkaKey = getManagedKafkaKey(managedKafka);

if (resourceConfigMap.getData().get(managedKafkaKey) != null) {
// a competing operator has already claimed resources for this managedkafka - will be removed as a possibility when leader election is supported
return Optional.empty();
}

Resources resources = createResources(managedKafka, profile);

int total = Integer.parseInt(resourceConfigMap.getData().getOrDefault(profile, "0"));
Expand All @@ -230,7 +237,7 @@ synchronized Optional<OperandReadiness> claimResources(ManagedKafka managedKafka

// update the configmap in a locked manner - this may fail, but we'll retry
resourceConfigMap.getData().put(profile, String.valueOf(newTotal));
resourceConfigMap.getData().put(getManagedKafkaKey(managedKafka), Serialization.asJson(resources));
resourceConfigMap.getData().put(managedKafkaKey, Serialization.asJson(resources));
client.configMaps()
.withName(FLEETSHARD_RESOURCES)
.lockResourceVersion(resourceConfigMap.getMetadata().getResourceVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,33 @@ void testReserved() {
assertEquals(Status.False, readiness.getStatus());
}

@Test
void testCompletingClaims() {
ManagedKafkaAgent dummyInstance = ManagedKafkaAgentResourceClient.getDummyInstance();
dummyInstance.getMetadata().setNamespace(client.getNamespace());
dummyInstance.getSpec()
.setCapacity(Map.of("standard",
new ProfileBuilder().withMaxNodes(30).build()));
client.resource(dummyInstance).createOrReplace();

ManagedKafka mk = ManagedKafka.getDummyInstance(1);
mk = new ManagedKafkaBuilder().editMetadata()
.endMetadata()
.build();

// add a real instance, presumably by another operator before the claimResources call
client.configMaps()
.create(new ConfigMapBuilder().withNewMetadata()
.withName(CapacityManager.FLEETSHARD_RESOURCES)
.endMetadata()
.withData(Map.of("standard", "1", CapacityManager.getManagedKafkaKey(mk),
"{\"profile\":\"standard\",\"units\":1}"))
.build());

Optional<OperandReadiness> result = capacityManager.claimResources(mk, "standard", dummyInstance);
assertTrue(result.isEmpty());
// should stay at 1, rather than incrementing again
assertEquals("1", capacityManager.getOrCreateResourceConfigMap(dummyInstance).getData().get("standard"));
}

}

0 comments on commit 0aa6d8f

Please # to comment.