Skip to content

Commit

Permalink
feat: EventBus增加safetySerialization,集群订阅时转换事件对象为可安全传递的对象. (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao authored Nov 16, 2023
1 parent 0bdb880 commit 90c7d08
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
4 changes: 4 additions & 0 deletions src/main/java/org/jetlinks/core/event/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ default Disposable subscribe(Subscription subscription,
* @param <T> 解码后结果类型
* @return 事件流
*/
@Deprecated
<T> Flux<T> subscribe(Subscription subscription, Decoder<T> decoder);

/**
Expand All @@ -73,6 +74,7 @@ default Disposable subscribe(Subscription subscription,
* @param <T> 类型
* @return 订阅者数量
*/
@Deprecated
<T> Mono<Long> publish(String topic, Encoder<T> encoder, Publisher<? extends T> eventStream);

/**
Expand Down Expand Up @@ -111,10 +113,12 @@ default <T> Flux<T> subscribe(Subscription subscription, Class<T> type) {
* @param <T> 事件类型
* @return 订阅者数量
*/
@Deprecated
default <T> Mono<Long> publish(String topic, Encoder<T> encoder, T event) {
return publish(topic, encoder, Mono.just(event));
}

@Deprecated
default <T> Mono<Long> publish(String topic, Encoder<T> encoder, T event, Scheduler scheduler) {
return publish(topic, encoder, Mono.just(event), scheduler);
}
Expand Down
49 changes: 28 additions & 21 deletions src/main/java/org/jetlinks/core/event/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,29 @@ public Subscription() {
public static Subscription of(String subscriber, String... topic) {

return Subscription
.builder()
.subscriberId(subscriber)
.topics(topic)
.build();
.builder()
.subscriberId(subscriber)
.topics(topic)
.build();
// return new Subscription(subscriber, topic, DEFAULT_FEATURES, null);
}

public static Subscription of(String subscriber, String[] topic, Feature... features) {
return Subscription
.builder()
.subscriberId(subscriber)
.topics(topic)
.features(features)
.build();
.builder()
.subscriberId(subscriber)
.topics(topic)
.features(features)
.build();
}

public static Subscription of(String subscriber, String topic, Feature... features) {
return Subscription
.builder()
.subscriberId(subscriber)
.topics(topic)
.features(features)
.build();
.builder()
.subscriberId(subscriber)
.topics(topic)
.features(features)
.build();
//return new Subscription(subscriber, new String[]{topic}, features, null);
}

Expand Down Expand Up @@ -101,18 +101,18 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
}

features = EnumDict
.getByMask(Feature.class, in.readLong())
.toArray(new Feature[0]);
.getByMask(Feature.class, in.readLong())
.toArray(new Feature[0]);
priority = in.readInt();
}

@Override
public String toString() {
return "Subscription{" +
"subscriber='" + subscriber + '\'' +
", topics=" + Arrays.toString(topics) +
", features=" + Arrays.toString(features) +
'}';
"subscriber='" + subscriber + '\'' +
", topics=" + Arrays.toString(topics) +
", features=" + Arrays.toString(features) +
'}';
}

@AllArgsConstructor
Expand All @@ -128,7 +128,14 @@ public enum Feature implements EnumDict<String> {
broker("订阅集群消息"),

sharedOldest("相同订阅者总是最先订阅的收到数据"),
sharedLocalFirst("集群下相同的订阅者总是本地的优先收到数据");
sharedLocalFirst("集群下相同的订阅者总是本地的优先收到数据"),
/**
* 集群传递时进行安全序列化,防止跨服务无法序列化事件类
* 收到消息请使用{@link TopicPayload#decode(Class)}反序列化为目标对象
*
* @see org.jetlinks.core.utils.SerializeUtils#convertToSafelySerializable(Object)
*/
safetySerialization("安全序列化");

private final String text;

Expand Down

0 comments on commit 90c7d08

Please # to comment.