Skip to content

Commit 15fc20c

Browse files
authored
Merge pull request #548 from ali-ince/1.6-fix-auto-read-for-routing
Make channel auto-read management to work on routing driver
2 parents f7fdc11 + 3940159 commit 15fc20c

17 files changed

+298
-203
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
2929
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
3030
import org.neo4j.driver.internal.messaging.MessageHandler;
31-
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3231
import org.neo4j.driver.internal.spi.ResponseHandler;
3332
import org.neo4j.driver.internal.util.ErrorUtil;
3433
import org.neo4j.driver.v1.Logger;
@@ -49,7 +48,7 @@ public class InboundMessageDispatcher implements MessageHandler
4948
private boolean fatalErrorOccurred;
5049
private boolean ackFailureMuted;
5150

52-
private AutoReadManagingResponseHandler autoReadManagingHandler;
51+
private ResponseHandler autoReadManagingHandler;
5352

5453
public InboundMessageDispatcher( Channel channel, Logging logging )
5554
{
@@ -248,7 +247,7 @@ public boolean isAckFailureMuted()
248247
/**
249248
* <b>Visible for testing</b>
250249
*/
251-
AutoReadManagingResponseHandler autoReadManagingHandler()
250+
ResponseHandler autoReadManagingHandler()
252251
{
253252
return autoReadManagingHandler;
254253
}
@@ -276,13 +275,13 @@ private ResponseHandler removeHandler()
276275

277276
private void updateAutoReadManagingHandlerIfNeeded( ResponseHandler handler )
278277
{
279-
if ( handler instanceof AutoReadManagingResponseHandler )
278+
if ( handler.canManageAutoRead() )
280279
{
281-
updateAutoReadManagingHandler( (AutoReadManagingResponseHandler) handler );
280+
updateAutoReadManagingHandler( handler );
282281
}
283282
}
284283

285-
private void updateAutoReadManagingHandler( AutoReadManagingResponseHandler newHandler )
284+
private void updateAutoReadManagingHandler( ResponseHandler newHandler )
286285
{
287286
if ( autoReadManagingHandler != null )
288287
{

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.concurrent.CompletionStage;
2828

2929
import org.neo4j.driver.internal.InternalRecord;
30-
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3130
import org.neo4j.driver.internal.spi.Connection;
31+
import org.neo4j.driver.internal.spi.ResponseHandler;
3232
import org.neo4j.driver.internal.util.Futures;
3333
import org.neo4j.driver.internal.util.Iterables;
3434
import org.neo4j.driver.internal.util.MetadataUtil;
@@ -44,7 +44,7 @@
4444
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4545
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4646

47-
public abstract class PullAllResponseHandler implements AutoReadManagingResponseHandler
47+
public abstract class PullAllResponseHandler implements ResponseHandler
4848
{
4949
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
5050

@@ -130,6 +130,12 @@ public synchronized void onRecord( Value[] fields )
130130
}
131131
}
132132

133+
@Override
134+
public boolean canManageAutoRead()
135+
{
136+
return true;
137+
}
138+
133139
@Override
134140
public synchronized void disableAutoReadManagement()
135141
{

driver/src/main/java/org/neo4j/driver/internal/handlers/RoutingResponseHandler.java

+12
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,18 @@ public void onRecord( Value[] fields )
6969
delegate.onRecord( fields );
7070
}
7171

72+
@Override
73+
public boolean canManageAutoRead()
74+
{
75+
return delegate.canManageAutoRead();
76+
}
77+
78+
@Override
79+
public void disableAutoReadManagement()
80+
{
81+
delegate.disableAutoReadManagement();
82+
}
83+
7284
private Throwable handledError( Throwable receivedError )
7385
{
7486
Throwable error = Futures.completionExceptionCause( receivedError );

driver/src/main/java/org/neo4j/driver/internal/spi/AutoReadManagingResponseHandler.java

-38
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/spi/ResponseHandler.java

+24
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.Map;
2222

23+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2324
import org.neo4j.driver.v1.Value;
2425

2526
public interface ResponseHandler
@@ -29,4 +30,27 @@ public interface ResponseHandler
2930
void onFailure( Throwable error );
3031

3132
void onRecord( Value[] fields );
33+
34+
/**
35+
* Tells whether this response handler is able to manage auto-read of the underlying connection using {@link Connection#enableAutoRead()} and
36+
* {@link Connection#disableAutoRead()}.
37+
* <p>
38+
* Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
39+
* There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
40+
* racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
41+
* a single auto-read managing handler per connection.
42+
*/
43+
default boolean canManageAutoRead()
44+
{
45+
return false;
46+
}
47+
48+
/**
49+
* If this response handler is able to manage auto-read of the underlying connection, then this method signals it to
50+
* stop changing auto-read setting for the connection.
51+
*/
52+
default void disableAutoReadManagement()
53+
{
54+
55+
}
3256
}

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java

+16-10
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.HashMap;
2929
import java.util.Map;
3030

31-
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3231
import org.neo4j.driver.internal.spi.ResponseHandler;
3332
import org.neo4j.driver.internal.value.IntegerValue;
3433
import org.neo4j.driver.v1.Value;
@@ -49,9 +48,9 @@
4948
import static org.mockito.Mockito.inOrder;
5049
import static org.mockito.Mockito.mock;
5150
import static org.mockito.Mockito.never;
51+
import static org.mockito.Mockito.only;
5252
import static org.mockito.Mockito.times;
5353
import static org.mockito.Mockito.verify;
54-
import static org.mockito.Mockito.verifyZeroInteractions;
5554
import static org.mockito.Mockito.when;
5655
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
5756
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
@@ -289,7 +288,7 @@ public void shouldFailHandlerOnIgnoredMessageWithExistingError()
289288

290289
dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE );
291290
verifyFailure( handler1 );
292-
verifyZeroInteractions( handler2 );
291+
verify( handler2, only() ).canManageAutoRead();
293292

294293
dispatcher.handleIgnoredMessage();
295294
verifyFailure( handler2 );
@@ -452,9 +451,9 @@ public void shouldKeepSingleAutoReadManagingHandler()
452451
{
453452
InboundMessageDispatcher dispatcher = newDispatcher();
454453

455-
AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
456-
AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
457-
AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
454+
ResponseHandler handler1 = newAutoReadManagingResponseHandler();
455+
ResponseHandler handler2 = newAutoReadManagingResponseHandler();
456+
ResponseHandler handler3 = newAutoReadManagingResponseHandler();
458457

459458
dispatcher.enqueue( handler1 );
460459
dispatcher.enqueue( handler2 );
@@ -471,8 +470,8 @@ public void shouldKeepTrackOfAutoReadManagingHandler()
471470
{
472471
InboundMessageDispatcher dispatcher = newDispatcher();
473472

474-
AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
475-
AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
473+
ResponseHandler handler1 = newAutoReadManagingResponseHandler();
474+
ResponseHandler handler2 = newAutoReadManagingResponseHandler();
476475

477476
assertNull( dispatcher.autoReadManagingHandler() );
478477

@@ -490,7 +489,7 @@ public void shouldForgetAutoReadManagingHandlerWhenItIsRemoved()
490489

491490
ResponseHandler handler1 = mock( ResponseHandler.class );
492491
ResponseHandler handler2 = mock( ResponseHandler.class );
493-
AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
492+
ResponseHandler handler3 = newAutoReadManagingResponseHandler();
494493

495494
dispatcher.enqueue( handler1 );
496495
dispatcher.enqueue( handler2 );
@@ -510,7 +509,7 @@ public void shouldReEnableAutoReadWhenAutoReadManagingHandlerIsRemoved()
510509
Channel channel = newChannelMock();
511510
InboundMessageDispatcher dispatcher = newDispatcher( channel );
512511

513-
AutoReadManagingResponseHandler handler = mock( AutoReadManagingResponseHandler.class );
512+
ResponseHandler handler = newAutoReadManagingResponseHandler();
514513
dispatcher.enqueue( handler );
515514
assertEquals( handler, dispatcher.autoReadManagingHandler() );
516515
verify( handler, never() ).disableAutoReadManagement();
@@ -548,4 +547,11 @@ private static Channel newChannelMock()
548547
when( channel.config() ).thenReturn( channelConfig );
549548
return channel;
550549
}
550+
551+
private static ResponseHandler newAutoReadManagingResponseHandler()
552+
{
553+
ResponseHandler handler = mock( ResponseHandler.class );
554+
when( handler.canManageAutoRead() ).thenReturn( true );
555+
return handler;
556+
}
551557
}

driver/src/test/java/org/neo4j/driver/internal/async/RoutingResponseHandlerTest.java driver/src/test/java/org/neo4j/driver/internal/handlers/RoutingResponseHandlerTest.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal.async;
19+
package org.neo4j.driver.internal.handlers;
2020

2121
import org.junit.Test;
2222
import org.mockito.ArgumentCaptor;
2323

2424
import java.util.concurrent.CompletionException;
2525

2626
import org.neo4j.driver.internal.RoutingErrorHandler;
27-
import org.neo4j.driver.internal.handlers.RoutingResponseHandler;
2827
import org.neo4j.driver.internal.spi.ResponseHandler;
2928
import org.neo4j.driver.v1.AccessMode;
3029
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -126,6 +125,30 @@ public void shouldHandleClientException()
126125
verifyZeroInteractions( errorHandler );
127126
}
128127

128+
@Test
129+
public void shouldDelegateCanManageAutoRead()
130+
{
131+
ResponseHandler responseHandler = mock( ResponseHandler.class );
132+
RoutingResponseHandler routingResponseHandler =
133+
new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, AccessMode.READ, null );
134+
135+
routingResponseHandler.canManageAutoRead();
136+
137+
verify( responseHandler ).canManageAutoRead();
138+
}
139+
140+
@Test
141+
public void shouldDelegateDisableAutoReadManagement()
142+
{
143+
ResponseHandler responseHandler = mock( ResponseHandler.class );
144+
RoutingResponseHandler routingResponseHandler =
145+
new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, AccessMode.READ, null );
146+
147+
routingResponseHandler.disableAutoReadManagement();
148+
149+
verify( responseHandler ).disableAutoReadManagement();
150+
}
151+
129152
private void testWriteFailureWithReadAccessMode( String code )
130153
{
131154
ClientException error = new ClientException( code, "Hi" );

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,35 @@
9292
import static org.neo4j.driver.v1.util.TestUtil.await;
9393
import static org.neo4j.driver.v1.util.TestUtil.awaitAllFutures;
9494

95-
public class CausalClusteringIT
95+
public class CausalClusteringIT extends NestedQueries
9696
{
9797
private static final long DEFAULT_TIMEOUT_MS = 120_000;
9898

9999
@Rule
100100
public final ClusterRule clusterRule = new ClusterRule();
101101

102102
private ExecutorService executor;
103+
private Driver driver;
104+
105+
@Override
106+
protected Session newSession( AccessMode mode )
107+
{
108+
if ( driver == null )
109+
{
110+
driver = createDriver( clusterRule.getCluster().leader().getRoutingUri() );
111+
}
112+
113+
return driver.session( mode );
114+
}
103115

104116
@After
105117
public void tearDown()
106118
{
119+
if ( driver != null )
120+
{
121+
driver.close();
122+
}
123+
107124
if ( executor != null )
108125
{
109126
executor.shutdownNow();

0 commit comments

Comments
 (0)