Skip to content

Commit

Permalink
Extended read stream and read all options with a filter
Browse files Browse the repository at this point in the history
  • Loading branch information
yreynhout committed Nov 7, 2024
1 parent f3b432a commit 9d8c613
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,17 @@ public StreamsOuterClass.ReadReq.Options.Builder createOptions() {
.setResolveLinks(this.options.shouldResolveLinkTos())
.setControlOption(StreamsOuterClass.ReadReq.Options.ControlOption.newBuilder().setCompatibility(1))
.setCount(this.options.getMaxCount())
.setNoFilter(Shared.Empty.getDefaultInstance())
//.setNoFilter(Shared.Empty.getDefaultInstance())
.setReadDirection(this.options.getDirection() == Direction.Forwards ?
StreamsOuterClass.ReadReq.Options.ReadDirection.Forwards :
StreamsOuterClass.ReadReq.Options.ReadDirection.Backwards);

if (this.options.getFilter() != null) {
this.options.getFilter().addToWireStreamsReadReq(builder);
} else {
builder.setNoFilter(Shared.Empty.getDefaultInstance());
}

return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Options of the read $all stream request.
*/
public class ReadAllOptions extends OptionsWithPositionAndResolveLinkTosBase<ReadAllOptions> {
private SubscriptionFilter filter;
private Direction direction;
private long maxCount;

Expand Down Expand Up @@ -57,4 +58,16 @@ public ReadAllOptions maxCount(long maxCount) {
this.maxCount = maxCount;
return this;
}

SubscriptionFilter getFilter() {
return filter;
}

/**
* Applies a server-side filter to determine if an event of the read should be yielded.
*/
public ReadAllOptions filter(SubscriptionFilter filter) {
this.filter = filter;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,21 @@ public ReadStream(GrpcClient client, String streamName, ReadStreamOptions option

@Override
public StreamsOuterClass.ReadReq.Options.Builder createOptions() {
return defaultReadOptions.clone()
StreamsOuterClass.ReadReq.Options.Builder optionsBuilder =
defaultReadOptions.clone()
.setStream(GrpcUtils.toStreamOptions(this.streamName, this.options.getStartingRevision()))
.setResolveLinks(this.options.shouldResolveLinkTos())
.setCount(this.options.getMaxCount())
.setControlOption(StreamsOuterClass.ReadReq.Options.ControlOption.newBuilder().setCompatibility(1))
.setNoFilter(Shared.Empty.getDefaultInstance())
//.setNoFilter(Shared.Empty.getDefaultInstance())
.setReadDirection(this.options.getDirection() == Direction.Forwards ?
StreamsOuterClass.ReadReq.Options.ReadDirection.Forwards :
StreamsOuterClass.ReadReq.Options.ReadDirection.Backwards);
if (this.options.getFilter() != null) {
this.options.getFilter().addToWireStreamsReadReq(optionsBuilder);
} else {
optionsBuilder.setNoFilter(Shared.Empty.getDefaultInstance());
}
return optionsBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Options of the read stream request.
*/
public class ReadStreamOptions extends OptionsWithStartRevisionAndResolveLinkTosBase<ReadStreamOptions> {
private SubscriptionFilter filter;
private Direction direction;
private long maxCount;

Expand Down Expand Up @@ -60,4 +61,16 @@ public ReadStreamOptions maxCount(long maxCount) {
this.maxCount = maxCount;
return this;
}

SubscriptionFilter getFilter() {
return filter;
}

/**
* Applies a server-side filter to determine if an event of the read should be yielded.
*/
public ReadStreamOptions filter(SubscriptionFilter filter) {
this.filter = filter;
return this;
}
}

0 comments on commit 9d8c613

Please # to comment.