diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ReadAll.java b/db-client-java/src/main/java/com/eventstore/dbclient/ReadAll.java index ce6192b5..c65e08cf 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ReadAll.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ReadAll.java @@ -33,11 +33,17 @@ public StreamsOuterClass.ReadReq.Options.Builder createOptions() { .setResolveLinks(this.options.shouldResolveLinkTos()) .setControlOption(StreamsOuterClass.ReadReq.Options.ControlOption.newBuilder().setCompatibility(1)) .setCount(this.options.getMaxCount()) - .setNoFilter(Shared.Empty.getDefaultInstance()) + //.setNoFilter(Shared.Empty.getDefaultInstance()) .setReadDirection(this.options.getDirection() == Direction.Forwards ? StreamsOuterClass.ReadReq.Options.ReadDirection.Forwards : StreamsOuterClass.ReadReq.Options.ReadDirection.Backwards); + if (this.options.getFilter() != null) { + this.options.getFilter().addToWireStreamsReadReq(builder); + } else { + builder.setNoFilter(Shared.Empty.getDefaultInstance()); + } + return builder; } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ReadAllOptions.java b/db-client-java/src/main/java/com/eventstore/dbclient/ReadAllOptions.java index 310d60dc..c19611f9 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ReadAllOptions.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ReadAllOptions.java @@ -4,6 +4,7 @@ * Options of the read $all stream request. */ public class ReadAllOptions extends OptionsWithPositionAndResolveLinkTosBase { + private SubscriptionFilter filter; private Direction direction; private long maxCount; @@ -57,4 +58,16 @@ public ReadAllOptions maxCount(long maxCount) { this.maxCount = maxCount; return this; } + + SubscriptionFilter getFilter() { + return filter; + } + + /** + * Applies a server-side filter to determine if an event of the read should be yielded. + */ + public ReadAllOptions filter(SubscriptionFilter filter) { + this.filter = filter; + return this; + } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ReadStream.java b/db-client-java/src/main/java/com/eventstore/dbclient/ReadStream.java index fa960c0b..4803b3f4 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ReadStream.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ReadStream.java @@ -20,14 +20,21 @@ public ReadStream(GrpcClient client, String streamName, ReadStreamOptions option @Override public StreamsOuterClass.ReadReq.Options.Builder createOptions() { - return defaultReadOptions.clone() + StreamsOuterClass.ReadReq.Options.Builder optionsBuilder = + defaultReadOptions.clone() .setStream(GrpcUtils.toStreamOptions(this.streamName, this.options.getStartingRevision())) .setResolveLinks(this.options.shouldResolveLinkTos()) .setCount(this.options.getMaxCount()) .setControlOption(StreamsOuterClass.ReadReq.Options.ControlOption.newBuilder().setCompatibility(1)) - .setNoFilter(Shared.Empty.getDefaultInstance()) + //.setNoFilter(Shared.Empty.getDefaultInstance()) .setReadDirection(this.options.getDirection() == Direction.Forwards ? StreamsOuterClass.ReadReq.Options.ReadDirection.Forwards : StreamsOuterClass.ReadReq.Options.ReadDirection.Backwards); + if (this.options.getFilter() != null) { + this.options.getFilter().addToWireStreamsReadReq(optionsBuilder); + } else { + optionsBuilder.setNoFilter(Shared.Empty.getDefaultInstance()); + } + return optionsBuilder; } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ReadStreamOptions.java b/db-client-java/src/main/java/com/eventstore/dbclient/ReadStreamOptions.java index 18ff8200..080e2c3b 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ReadStreamOptions.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ReadStreamOptions.java @@ -4,6 +4,7 @@ * Options of the read stream request. */ public class ReadStreamOptions extends OptionsWithStartRevisionAndResolveLinkTosBase { + private SubscriptionFilter filter; private Direction direction; private long maxCount; @@ -60,4 +61,16 @@ public ReadStreamOptions maxCount(long maxCount) { this.maxCount = maxCount; return this; } + + SubscriptionFilter getFilter() { + return filter; + } + + /** + * Applies a server-side filter to determine if an event of the read should be yielded. + */ + public ReadStreamOptions filter(SubscriptionFilter filter) { + this.filter = filter; + return this; + } }