diff --git a/operator/src/main/java/org/bf2/operator/managers/CapacityManager.java b/operator/src/main/java/org/bf2/operator/managers/CapacityManager.java index 720bf0cc6..25a1b5fdd 100644 --- a/operator/src/main/java/org/bf2/operator/managers/CapacityManager.java +++ b/operator/src/main/java/org/bf2/operator/managers/CapacityManager.java @@ -216,6 +216,13 @@ synchronized Optional claimResources(ManagedKafka managedKafka // ensure we have the latest ConfigMap resourceConfigMap = client.configMaps().withName(FLEETSHARD_RESOURCES).get(); + String managedKafkaKey = getManagedKafkaKey(managedKafka); + + if (resourceConfigMap.getData().get(managedKafkaKey) != null) { + // a competing operator has already claimed resources for this managedkafka - will be removed as a possibility when leader election is supported + return Optional.empty(); + } + Resources resources = createResources(managedKafka, profile); int total = Integer.parseInt(resourceConfigMap.getData().getOrDefault(profile, "0")); @@ -230,7 +237,7 @@ synchronized Optional claimResources(ManagedKafka managedKafka // update the configmap in a locked manner - this may fail, but we'll retry resourceConfigMap.getData().put(profile, String.valueOf(newTotal)); - resourceConfigMap.getData().put(getManagedKafkaKey(managedKafka), Serialization.asJson(resources)); + resourceConfigMap.getData().put(managedKafkaKey, Serialization.asJson(resources)); client.configMaps() .withName(FLEETSHARD_RESOURCES) .lockResourceVersion(resourceConfigMap.getMetadata().getResourceVersion()) diff --git a/operator/src/test/java/org/bf2/operator/managers/CapacityManagerTest.java b/operator/src/test/java/org/bf2/operator/managers/CapacityManagerTest.java index fcfcec22c..631d558e0 100644 --- a/operator/src/test/java/org/bf2/operator/managers/CapacityManagerTest.java +++ b/operator/src/test/java/org/bf2/operator/managers/CapacityManagerTest.java @@ -219,4 +219,33 @@ void testReserved() { assertEquals(Status.False, readiness.getStatus()); } + @Test + void testCompletingClaims() { + ManagedKafkaAgent dummyInstance = ManagedKafkaAgentResourceClient.getDummyInstance(); + dummyInstance.getMetadata().setNamespace(client.getNamespace()); + dummyInstance.getSpec() + .setCapacity(Map.of("standard", + new ProfileBuilder().withMaxNodes(30).build())); + client.resource(dummyInstance).createOrReplace(); + + ManagedKafka mk = ManagedKafka.getDummyInstance(1); + mk = new ManagedKafkaBuilder().editMetadata() + .endMetadata() + .build(); + + // add a real instance, presumably by another operator before the claimResources call + client.configMaps() + .create(new ConfigMapBuilder().withNewMetadata() + .withName(CapacityManager.FLEETSHARD_RESOURCES) + .endMetadata() + .withData(Map.of("standard", "1", CapacityManager.getManagedKafkaKey(mk), + "{\"profile\":\"standard\",\"units\":1}")) + .build()); + + Optional result = capacityManager.claimResources(mk, "standard", dummyInstance); + assertTrue(result.isEmpty()); + // should stay at 1, rather than incrementing again + assertEquals("1", capacityManager.getOrCreateResourceConfigMap(dummyInstance).getData().get("standard")); + } + }