From 90c7d08f1c3721bbc5b00f7de9f197ffb0f9d2a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E5=91=A8?= Date: Thu, 16 Nov 2023 15:29:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20EventBus=E5=A2=9E=E5=8A=A0safetySeriali?= =?UTF-8?q?zation,=E9=9B=86=E7=BE=A4=E8=AE=A2=E9=98=85=E6=97=B6=E8=BD=AC?= =?UTF-8?q?=E6=8D=A2=E4=BA=8B=E4=BB=B6=E5=AF=B9=E8=B1=A1=E4=B8=BA=E5=8F=AF?= =?UTF-8?q?=E5=AE=89=E5=85=A8=E4=BC=A0=E9=80=92=E7=9A=84=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?.=20(#18)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/jetlinks/core/event/EventBus.java | 4 ++ .../org/jetlinks/core/event/Subscription.java | 49 +++++++++++-------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/jetlinks/core/event/EventBus.java b/src/main/java/org/jetlinks/core/event/EventBus.java index 134abc76..e8ec8e42 100644 --- a/src/main/java/org/jetlinks/core/event/EventBus.java +++ b/src/main/java/org/jetlinks/core/event/EventBus.java @@ -51,6 +51,7 @@ default Disposable subscribe(Subscription subscription, * @param 解码后结果类型 * @return 事件流 */ + @Deprecated Flux subscribe(Subscription subscription, Decoder decoder); /** @@ -73,6 +74,7 @@ default Disposable subscribe(Subscription subscription, * @param 类型 * @return 订阅者数量 */ + @Deprecated Mono publish(String topic, Encoder encoder, Publisher eventStream); /** @@ -111,10 +113,12 @@ default Flux subscribe(Subscription subscription, Class type) { * @param 事件类型 * @return 订阅者数量 */ + @Deprecated default Mono publish(String topic, Encoder encoder, T event) { return publish(topic, encoder, Mono.just(event)); } + @Deprecated default Mono publish(String topic, Encoder encoder, T event, Scheduler scheduler) { return publish(topic, encoder, Mono.just(event), scheduler); } diff --git a/src/main/java/org/jetlinks/core/event/Subscription.java b/src/main/java/org/jetlinks/core/event/Subscription.java index ca2fab23..4c5283dd 100644 --- a/src/main/java/org/jetlinks/core/event/Subscription.java +++ b/src/main/java/org/jetlinks/core/event/Subscription.java @@ -38,29 +38,29 @@ public Subscription() { public static Subscription of(String subscriber, String... topic) { return Subscription - .builder() - .subscriberId(subscriber) - .topics(topic) - .build(); + .builder() + .subscriberId(subscriber) + .topics(topic) + .build(); // return new Subscription(subscriber, topic, DEFAULT_FEATURES, null); } public static Subscription of(String subscriber, String[] topic, Feature... features) { return Subscription - .builder() - .subscriberId(subscriber) - .topics(topic) - .features(features) - .build(); + .builder() + .subscriberId(subscriber) + .topics(topic) + .features(features) + .build(); } public static Subscription of(String subscriber, String topic, Feature... features) { return Subscription - .builder() - .subscriberId(subscriber) - .topics(topic) - .features(features) - .build(); + .builder() + .subscriberId(subscriber) + .topics(topic) + .features(features) + .build(); //return new Subscription(subscriber, new String[]{topic}, features, null); } @@ -101,18 +101,18 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept } features = EnumDict - .getByMask(Feature.class, in.readLong()) - .toArray(new Feature[0]); + .getByMask(Feature.class, in.readLong()) + .toArray(new Feature[0]); priority = in.readInt(); } @Override public String toString() { return "Subscription{" + - "subscriber='" + subscriber + '\'' + - ", topics=" + Arrays.toString(topics) + - ", features=" + Arrays.toString(features) + - '}'; + "subscriber='" + subscriber + '\'' + + ", topics=" + Arrays.toString(topics) + + ", features=" + Arrays.toString(features) + + '}'; } @AllArgsConstructor @@ -128,7 +128,14 @@ public enum Feature implements EnumDict { broker("订阅集群消息"), sharedOldest("相同订阅者总是最先订阅的收到数据"), - sharedLocalFirst("集群下相同的订阅者总是本地的优先收到数据"); + sharedLocalFirst("集群下相同的订阅者总是本地的优先收到数据"), + /** + * 集群传递时进行安全序列化,防止跨服务无法序列化事件类 + * 收到消息请使用{@link TopicPayload#decode(Class)}反序列化为目标对象 + * + * @see org.jetlinks.core.utils.SerializeUtils#convertToSafelySerializable(Object) + */ + safetySerialization("安全序列化"); private final String text;