Skip to content

Commit 5a2c94b

Browse files
committed
core: Free unused MessageProducer in RetriableStream
This prevents leaking message buffers. Fixes #9563
1 parent 9de989b commit 5a2c94b

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,7 @@ public void messagesAvailable(final MessageProducer producer) {
10991099
checkState(
11001100
savedState.winningSubstream != null, "Headers should be received prior to messages.");
11011101
if (savedState.winningSubstream != substream) {
1102+
GrpcUtil.closeQuietly(producer);
11021103
return;
11031104
}
11041105
listenerSerializeExecutor.execute(

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import io.grpc.internal.StreamListener.MessageProducer;
6262
import java.io.InputStream;
6363
import java.util.ArrayList;
64+
import java.util.Arrays;
65+
import java.util.Iterator;
6466
import java.util.List;
6567
import java.util.Random;
6668
import java.util.concurrent.Executor;
@@ -998,6 +1000,27 @@ public void messageAvailable() {
9981000
verify(masterListener).messagesAvailable(messageProducer);
9991001
}
10001002

1003+
@Test
1004+
public void inboundMessagesClosedOnCancel() throws Exception {
1005+
ClientStream mockStream1 = mock(ClientStream.class);
1006+
doReturn(mockStream1).when(retriableStreamRecorder).newSubstream(0);
1007+
1008+
retriableStream.start(masterListener);
1009+
retriableStream.request(1);
1010+
retriableStream.cancel(Status.CANCELLED.withDescription("on purpose"));
1011+
1012+
ArgumentCaptor<ClientStreamListener> sublistenerCaptor1 =
1013+
ArgumentCaptor.forClass(ClientStreamListener.class);
1014+
verify(mockStream1).start(sublistenerCaptor1.capture());
1015+
1016+
ClientStreamListener listener = sublistenerCaptor1.getValue();
1017+
listener.headersRead(new Metadata());
1018+
InputStream is = mock(InputStream.class);
1019+
listener.messagesAvailable(new FakeMessageProducer(is));
1020+
verify(masterListener, never()).messagesAvailable(any(MessageProducer.class));
1021+
verify(is).close();
1022+
}
1023+
10011024
@Test
10021025
public void notAdd0PrevRetryAttemptsToRespHeaders() {
10031026
ClientStream mockStream1 = mock(ClientStream.class);
@@ -2786,4 +2809,22 @@ private interface RetriableStreamRecorder {
27862809

27872810
Status prestart();
27882811
}
2812+
2813+
private static final class FakeMessageProducer implements MessageProducer {
2814+
private final Iterator<InputStream> iterator;
2815+
2816+
public FakeMessageProducer(InputStream... iss) {
2817+
this.iterator = Arrays.asList(iss).iterator();
2818+
}
2819+
2820+
@Override
2821+
@Nullable
2822+
public InputStream next() {
2823+
if (iterator.hasNext()) {
2824+
return iterator.next();
2825+
} else {
2826+
return null;
2827+
}
2828+
}
2829+
}
27892830
}

0 commit comments

Comments
 (0)