Skip to content

Commit 045b63b

Browse files
author
Alexander Preuß
committed
WIP
1 parent 0061e55 commit 045b63b

File tree

7 files changed

+460
-0
lines changed

7 files changed

+460
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.springframework.pulsar.core;
2+
3+
public interface PulsarAdminOperations {
4+
5+
void createOrModifyTopics(PulsarTopic... topics);
6+
7+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package org.springframework.pulsar.core;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.Collection;
6+
import java.util.HashMap;
7+
import java.util.HashSet;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Set;
11+
import org.apache.commons.logging.LogFactory;
12+
import org.apache.pulsar.client.admin.PulsarAdmin;
13+
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
14+
import org.apache.pulsar.client.admin.PulsarAdminException;
15+
import org.apache.pulsar.client.api.PulsarClientException;
16+
import org.springframework.beans.BeansException;
17+
import org.springframework.beans.factory.SmartInitializingSingleton;
18+
import org.springframework.context.ApplicationContext;
19+
import org.springframework.context.ApplicationContextAware;
20+
import org.springframework.core.log.LogAccessor;
21+
import org.springframework.util.CollectionUtils;
22+
23+
public class PulsarAdministration implements ApplicationContextAware, SmartInitializingSingleton, PulsarAdminOperations {
24+
private final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
25+
26+
private final PulsarAdminBuilder adminBuilder;
27+
28+
private ApplicationContext applicationContext;
29+
30+
public PulsarAdministration(Map<String, Object> adminConfig) {
31+
this.adminBuilder = PulsarAdmin.builder().loadConf(adminConfig);
32+
}
33+
34+
public PulsarAdministration(PulsarAdminBuilder adminBuilder) {
35+
this.adminBuilder = adminBuilder;
36+
}
37+
38+
@Override
39+
public void afterSingletonsInstantiated() {
40+
initialize();
41+
}
42+
43+
@Override
44+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
45+
this.applicationContext = applicationContext;
46+
}
47+
48+
public void initialize() {
49+
Collection<PulsarTopic> topics = this.applicationContext.getBeansOfType(PulsarTopic.class, false, false).values();
50+
if (CollectionUtils.isEmpty(topics)) {
51+
return;
52+
}
53+
54+
PulsarAdmin admin = null;
55+
try {
56+
admin = createAdminClient();
57+
} catch (Exception e) {
58+
throw new IllegalStateException("Could not create PulsarAdmin", e);
59+
}
60+
61+
if (admin != null) {
62+
createOrModifyTopicsIfNeeded(admin, topics);
63+
}
64+
}
65+
66+
private PulsarAdmin createAdminClient() throws PulsarClientException {
67+
return adminBuilder.build();
68+
}
69+
70+
@Override
71+
public void createOrModifyTopics(PulsarTopic... topics) {
72+
PulsarAdmin admin;
73+
try {
74+
admin = createAdminClient();
75+
} catch (Exception e) {
76+
throw new IllegalStateException("Could not create PulsarAdmin", e);
77+
}
78+
79+
if (admin != null) {
80+
createOrModifyTopicsIfNeeded(admin, Arrays.asList(topics));
81+
}
82+
}
83+
84+
private Map<String, List<PulsarTopic>> getTopicsPerNamespace(Collection<PulsarTopic> topics) {
85+
Map<String, List<PulsarTopic>> topicsPerNamespace = new HashMap<>();
86+
topics.forEach(topic -> {
87+
PulsarTopic.TopicComponents topicComponents = topic.getComponents();
88+
String tenant = topicComponents.tenant();
89+
String namespace = topicComponents.namespace();
90+
String namespaceIdentifier = tenant + "/" + namespace;
91+
topicsPerNamespace.computeIfAbsent(namespaceIdentifier, k -> new ArrayList<>()).add(topic);
92+
});
93+
return topicsPerNamespace;
94+
}
95+
96+
private List<String> getMatchingTopicPartitions(PulsarTopic topic, List<String> existingTopics) {
97+
return existingTopics
98+
.stream()
99+
.filter(existing -> existing.startsWith(topic.getFullyQualifiedTopicName() + "-partition-"))
100+
.toList();
101+
}
102+
103+
private void createOrModifyTopicsIfNeeded(PulsarAdmin admin, Collection<PulsarTopic> topics) {
104+
if (CollectionUtils.isEmpty(topics)) {
105+
return;
106+
}
107+
108+
Map<String, List<PulsarTopic>> topicsPerNamespace = getTopicsPerNamespace(topics);
109+
110+
Set<PulsarTopic> topicsToCreate = new HashSet<>();
111+
Set<PulsarTopic> topicsToModify = new HashSet<>();
112+
113+
topicsPerNamespace.forEach((namespace, requestedTopics) -> {
114+
try (admin) {
115+
List<String> existingTopicsInNamespace = admin.topics().getList(namespace);
116+
117+
for (PulsarTopic topic : requestedTopics) {
118+
if (topic.isPartitioned()) {
119+
List<String> matchingPartitions = getMatchingTopicPartitions(topic, existingTopicsInNamespace);
120+
if (matchingPartitions.isEmpty()) {
121+
logger.info("Topic " + topic.getFullyQualifiedTopicName() + " does not exist.");
122+
topicsToCreate.add(topic);
123+
} else {
124+
int numberOfExistingPartitions = matchingPartitions.size();
125+
if (numberOfExistingPartitions < topic.numberOfPartitions()) {
126+
logger.info("Topic " + topic.getFullyQualifiedTopicName() + " found with "
127+
+ numberOfExistingPartitions + " partitions.");
128+
topicsToModify.add(topic);
129+
} else if (numberOfExistingPartitions > topic.numberOfPartitions()) {
130+
throw new IllegalStateException(
131+
"Topic " + topic.getFullyQualifiedTopicName() + " found with "
132+
+ numberOfExistingPartitions
133+
+ " partitions. Needs to be deleted first.");
134+
}
135+
}
136+
} else {
137+
if (!existingTopicsInNamespace.contains(topic.getFullyQualifiedTopicName())) {
138+
logger.info("Topic " + topic.getFullyQualifiedTopicName() + " does not exist.");
139+
topicsToCreate.add(topic);
140+
}
141+
}
142+
}
143+
144+
createTopics(admin, topicsToCreate);
145+
modifyTopics(admin, topicsToModify);
146+
} catch (PulsarAdminException e) {
147+
throw new RuntimeException(e);
148+
}
149+
});
150+
}
151+
152+
private void createTopics(PulsarAdmin admin, Set<PulsarTopic> topicsToCreate) throws PulsarAdminException {
153+
for (PulsarTopic topic : topicsToCreate) {
154+
if (topic.isPartitioned()) {
155+
admin.topics().createPartitionedTopic(topic.topicName(), topic.numberOfPartitions());
156+
} else {
157+
admin.topics().createNonPartitionedTopic(topic.topicName());
158+
}
159+
}
160+
}
161+
162+
private void modifyTopics(PulsarAdmin admin, Set<PulsarTopic> topicsToModify) throws PulsarAdminException {
163+
for (PulsarTopic topic : topicsToModify) {
164+
admin.topics().updatePartitionedTopic(topic.topicName(), topic.numberOfPartitions());
165+
}
166+
}
167+
168+
169+
170+
171+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.springframework.pulsar.core;
2+
3+
import java.util.Map;
4+
import org.apache.pulsar.common.naming.TopicDomain;
5+
6+
public record PulsarTopic(String topicName, int numberOfPartitions, Map<String, String> properties) {
7+
8+
public static PulsarTopicBuilder builder(String topicName) {
9+
return new PulsarTopicBuilder(topicName);
10+
}
11+
12+
public String getFullyQualifiedTopicName() {
13+
return this.getComponents().toString();
14+
15+
}
16+
17+
public boolean isPartitioned() {
18+
return this.numberOfPartitions != 0;
19+
}
20+
21+
public TopicComponents getComponents() {
22+
String[] splitTopic = this.topicName().split("/");
23+
if (splitTopic.length == 1) { // looks like 'my-topic'
24+
return new TopicComponents(TopicDomain.persistent, "public", "default", splitTopic[0]);
25+
} else if (splitTopic.length == 3) { // looks like 'public/default/my-topic'
26+
return new TopicComponents(TopicDomain.persistent, splitTopic[0], splitTopic[1], splitTopic[2]);
27+
} else if (splitTopic.length == 5) { // looks like 'persistent://public/default/my-topic'
28+
String type = splitTopic[0].substring(0, splitTopic[0].length() - 1); // remove ':'
29+
return new TopicComponents(TopicDomain.getEnum(type), splitTopic[2], splitTopic[3], splitTopic[4]);
30+
}
31+
throw new IllegalArgumentException("Topic name '" + this + "' has unexpected components.");
32+
33+
}
34+
35+
record TopicComponents(TopicDomain domain, String tenant, String namespace, String topic) {
36+
@Override
37+
public String toString() {
38+
return this.domain + "://" + this.tenant + "/" + this.namespace + "/" + this.topic;
39+
}
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.springframework.pulsar.core;
2+
3+
import java.util.Map;
4+
5+
public class PulsarTopicBuilder {
6+
7+
private String topicName;
8+
9+
private int numberOfPartitions;
10+
11+
private Map<String, String> properties;
12+
13+
14+
protected PulsarTopicBuilder(String topicName) {
15+
this.topicName = topicName;
16+
}
17+
18+
public PulsarTopicBuilder setNumberOfPartitions(int numberOfPartitions) {
19+
this.numberOfPartitions = numberOfPartitions;
20+
return this;
21+
}
22+
23+
public PulsarTopicBuilder setProperties(Map<String, String> properties) {
24+
this.properties = properties;
25+
return this;
26+
}
27+
28+
public PulsarTopic build() {
29+
return new PulsarTopic(this.topicName, this.numberOfPartitions, this.properties);
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.springframework.pulsar.core;
2+
3+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
4+
import java.util.Map;
5+
import org.apache.pulsar.client.admin.PulsarAdmin;
6+
import org.junit.jupiter.api.Test;
7+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
11+
12+
public class PulsarAdministrationBadContextTests extends AbstractContainerBaseTests {
13+
14+
@Test
15+
void testDecrementingPartitionCount() {
16+
assertThatIllegalStateException()
17+
.isThrownBy(() -> new AnnotationConfigApplicationContext(DecrementPartitionCountConfig.class).close())
18+
.withMessage("Topic persistent://public/default/dpc-partitioned-1 found with 8 partitions. Needs to be deleted first.");
19+
}
20+
21+
@Configuration(proxyBeanMethods = false)
22+
static class DecrementPartitionCountConfig {
23+
static {
24+
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(getHttpServiceUrl()).build()) {
25+
admin.topics().createPartitionedTopic("dpc-partitioned-1", 8);
26+
} catch (Exception e) {
27+
throw new RuntimeException(e);
28+
}
29+
}
30+
31+
@Bean
32+
PulsarAdministration admin() {
33+
return new PulsarAdministration(Map.of("serviceUrl", getHttpServiceUrl()));
34+
}
35+
36+
@Bean
37+
PulsarTopic partitionedTopic() {
38+
return PulsarTopic.builder("dpc-partitioned-1").setNumberOfPartitions(4).build();
39+
}
40+
}
41+
}

0 commit comments

Comments
 (0)