diff --git a/src/main/java/org/jetlinks/core/event/EventBus.java b/src/main/java/org/jetlinks/core/event/EventBus.java index 134abc76..e8ec8e42 100644 --- a/src/main/java/org/jetlinks/core/event/EventBus.java +++ b/src/main/java/org/jetlinks/core/event/EventBus.java @@ -51,6 +51,7 @@ default Disposable subscribe(Subscription subscription, * @param 解码后结果类型 * @return 事件流 */ + @Deprecated Flux subscribe(Subscription subscription, Decoder decoder); /** @@ -73,6 +74,7 @@ default Disposable subscribe(Subscription subscription, * @param 类型 * @return 订阅者数量 */ + @Deprecated Mono publish(String topic, Encoder encoder, Publisher eventStream); /** @@ -111,10 +113,12 @@ default Flux subscribe(Subscription subscription, Class type) { * @param 事件类型 * @return 订阅者数量 */ + @Deprecated default Mono publish(String topic, Encoder encoder, T event) { return publish(topic, encoder, Mono.just(event)); } + @Deprecated default Mono publish(String topic, Encoder encoder, T event, Scheduler scheduler) { return publish(topic, encoder, Mono.just(event), scheduler); } diff --git a/src/main/java/org/jetlinks/core/event/Subscription.java b/src/main/java/org/jetlinks/core/event/Subscription.java index ca2fab23..4c5283dd 100644 --- a/src/main/java/org/jetlinks/core/event/Subscription.java +++ b/src/main/java/org/jetlinks/core/event/Subscription.java @@ -38,29 +38,29 @@ public Subscription() { public static Subscription of(String subscriber, String... topic) { return Subscription - .builder() - .subscriberId(subscriber) - .topics(topic) - .build(); + .builder() + .subscriberId(subscriber) + .topics(topic) + .build(); // return new Subscription(subscriber, topic, DEFAULT_FEATURES, null); } public static Subscription of(String subscriber, String[] topic, Feature... features) { return Subscription - .builder() - .subscriberId(subscriber) - .topics(topic) - .features(features) - .build(); + .builder() + .subscriberId(subscriber) + .topics(topic) + .features(features) + .build(); } public static Subscription of(String subscriber, String topic, Feature... features) { return Subscription - .builder() - .subscriberId(subscriber) - .topics(topic) - .features(features) - .build(); + .builder() + .subscriberId(subscriber) + .topics(topic) + .features(features) + .build(); //return new Subscription(subscriber, new String[]{topic}, features, null); } @@ -101,18 +101,18 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept } features = EnumDict - .getByMask(Feature.class, in.readLong()) - .toArray(new Feature[0]); + .getByMask(Feature.class, in.readLong()) + .toArray(new Feature[0]); priority = in.readInt(); } @Override public String toString() { return "Subscription{" + - "subscriber='" + subscriber + '\'' + - ", topics=" + Arrays.toString(topics) + - ", features=" + Arrays.toString(features) + - '}'; + "subscriber='" + subscriber + '\'' + + ", topics=" + Arrays.toString(topics) + + ", features=" + Arrays.toString(features) + + '}'; } @AllArgsConstructor @@ -128,7 +128,14 @@ public enum Feature implements EnumDict { broker("订阅集群消息"), sharedOldest("相同订阅者总是最先订阅的收到数据"), - sharedLocalFirst("集群下相同的订阅者总是本地的优先收到数据"); + sharedLocalFirst("集群下相同的订阅者总是本地的优先收到数据"), + /** + * 集群传递时进行安全序列化,防止跨服务无法序列化事件类 + * 收到消息请使用{@link TopicPayload#decode(Class)}反序列化为目标对象 + * + * @see org.jetlinks.core.utils.SerializeUtils#convertToSafelySerializable(Object) + */ + safetySerialization("安全序列化"); private final String text;