- * Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records. - * There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read - * racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just - * a single auto-read managing handler per connection. - */ -public interface AutoReadManagingResponseHandler extends ResponseHandler -{ - /** - * Tell this handler that it should stop changing auto-read setting for the connection. - */ - void disableAutoReadManagement(); -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/ResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ResponseHandler.java index ac19ebfda3..8a668e740d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/ResponseHandler.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ResponseHandler.java @@ -20,6 +20,7 @@ import java.util.Map; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; import org.neo4j.driver.v1.Value; public interface ResponseHandler @@ -29,4 +30,27 @@ public interface ResponseHandler void onFailure( Throwable error ); void onRecord( Value[] fields ); + + /** + * Tells whether this response handler is able to manage auto-read of the underlying connection using {@link Connection#enableAutoRead()} and + * {@link Connection#disableAutoRead()}. + *
+ * Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
+ * There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
+ * racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
+ * a single auto-read managing handler per connection.
+ */
+ default boolean canManageAutoRead()
+ {
+ return false;
+ }
+
+ /**
+ * If this response handler is able to manage auto-read of the underlying connection, then this method signals it to
+ * stop changing auto-read setting for the connection.
+ */
+ default void disableAutoReadManagement()
+ {
+
+ }
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java
index 4ff3ce0d35..6897221724 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcherTest.java
@@ -28,7 +28,6 @@
import java.util.HashMap;
import java.util.Map;
-import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.value.IntegerValue;
import org.neo4j.driver.v1.Value;
@@ -49,9 +48,9 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
@@ -289,7 +288,7 @@ public void shouldFailHandlerOnIgnoredMessageWithExistingError()
dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE );
verifyFailure( handler1 );
- verifyZeroInteractions( handler2 );
+ verify( handler2, only() ).canManageAutoRead();
dispatcher.handleIgnoredMessage();
verifyFailure( handler2 );
@@ -452,9 +451,9 @@ public void shouldKeepSingleAutoReadManagingHandler()
{
InboundMessageDispatcher dispatcher = newDispatcher();
- AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
- AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
- AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
+ ResponseHandler handler1 = newAutoReadManagingResponseHandler();
+ ResponseHandler handler2 = newAutoReadManagingResponseHandler();
+ ResponseHandler handler3 = newAutoReadManagingResponseHandler();
dispatcher.enqueue( handler1 );
dispatcher.enqueue( handler2 );
@@ -471,8 +470,8 @@ public void shouldKeepTrackOfAutoReadManagingHandler()
{
InboundMessageDispatcher dispatcher = newDispatcher();
- AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
- AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
+ ResponseHandler handler1 = newAutoReadManagingResponseHandler();
+ ResponseHandler handler2 = newAutoReadManagingResponseHandler();
assertNull( dispatcher.autoReadManagingHandler() );
@@ -490,7 +489,7 @@ public void shouldForgetAutoReadManagingHandlerWhenItIsRemoved()
ResponseHandler handler1 = mock( ResponseHandler.class );
ResponseHandler handler2 = mock( ResponseHandler.class );
- AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
+ ResponseHandler handler3 = newAutoReadManagingResponseHandler();
dispatcher.enqueue( handler1 );
dispatcher.enqueue( handler2 );
@@ -510,7 +509,7 @@ public void shouldReEnableAutoReadWhenAutoReadManagingHandlerIsRemoved()
Channel channel = newChannelMock();
InboundMessageDispatcher dispatcher = newDispatcher( channel );
- AutoReadManagingResponseHandler handler = mock( AutoReadManagingResponseHandler.class );
+ ResponseHandler handler = newAutoReadManagingResponseHandler();
dispatcher.enqueue( handler );
assertEquals( handler, dispatcher.autoReadManagingHandler() );
verify( handler, never() ).disableAutoReadManagement();
@@ -548,4 +547,11 @@ private static Channel newChannelMock()
when( channel.config() ).thenReturn( channelConfig );
return channel;
}
+
+ private static ResponseHandler newAutoReadManagingResponseHandler()
+ {
+ ResponseHandler handler = mock( ResponseHandler.class );
+ when( handler.canManageAutoRead() ).thenReturn( true );
+ return handler;
+ }
}
diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/RoutingResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/RoutingResponseHandlerTest.java
similarity index 87%
rename from driver/src/test/java/org/neo4j/driver/internal/async/RoutingResponseHandlerTest.java
rename to driver/src/test/java/org/neo4j/driver/internal/handlers/RoutingResponseHandlerTest.java
index 313b70c720..fc656ac123 100644
--- a/driver/src/test/java/org/neo4j/driver/internal/async/RoutingResponseHandlerTest.java
+++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/RoutingResponseHandlerTest.java
@@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.neo4j.driver.internal.async;
+package org.neo4j.driver.internal.handlers;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -24,7 +24,6 @@
import java.util.concurrent.CompletionException;
import org.neo4j.driver.internal.RoutingErrorHandler;
-import org.neo4j.driver.internal.handlers.RoutingResponseHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.exceptions.ClientException;
@@ -126,6 +125,30 @@ public void shouldHandleClientException()
verifyZeroInteractions( errorHandler );
}
+ @Test
+ public void shouldDelegateCanManageAutoRead()
+ {
+ ResponseHandler responseHandler = mock( ResponseHandler.class );
+ RoutingResponseHandler routingResponseHandler =
+ new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, AccessMode.READ, null );
+
+ routingResponseHandler.canManageAutoRead();
+
+ verify( responseHandler ).canManageAutoRead();
+ }
+
+ @Test
+ public void shouldDelegateDisableAutoReadManagement()
+ {
+ ResponseHandler responseHandler = mock( ResponseHandler.class );
+ RoutingResponseHandler routingResponseHandler =
+ new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, AccessMode.READ, null );
+
+ routingResponseHandler.disableAutoReadManagement();
+
+ verify( responseHandler ).disableAutoReadManagement();
+ }
+
private void testWriteFailureWithReadAccessMode( String code )
{
ClientException error = new ClientException( code, "Hi" );
diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java
index 1f93a22fc3..0c2d8e8164 100644
--- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java
+++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java
@@ -92,7 +92,7 @@
import static org.neo4j.driver.v1.util.TestUtil.await;
import static org.neo4j.driver.v1.util.TestUtil.awaitAllFutures;
-public class CausalClusteringIT
+public class CausalClusteringIT extends NestedQueries
{
private static final long DEFAULT_TIMEOUT_MS = 120_000;
@@ -100,10 +100,27 @@ public class CausalClusteringIT
public final ClusterRule clusterRule = new ClusterRule();
private ExecutorService executor;
+ private Driver driver;
+
+ @Override
+ protected Session newSession( AccessMode mode )
+ {
+ if ( driver == null )
+ {
+ driver = createDriver( clusterRule.getCluster().leader().getRoutingUri() );
+ }
+
+ return driver.session( mode );
+ }
@After
public void tearDown()
{
+ if ( driver != null )
+ {
+ driver.close();
+ }
+
if ( executor != null )
{
executor.shutdownNow();
diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/NestedQueries.java b/driver/src/test/java/org/neo4j/driver/v1/integration/NestedQueries.java
new file mode 100644
index 0000000000..7dcada8d2e
--- /dev/null
+++ b/driver/src/test/java/org/neo4j/driver/v1/integration/NestedQueries.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2002-2018 "Neo4j,"
+ * Neo4j Sweden AB [http://neo4j.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.neo4j.driver.v1.integration;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import org.neo4j.driver.v1.AccessMode;
+import org.neo4j.driver.v1.Record;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.StatementRunner;
+import org.neo4j.driver.v1.Transaction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public abstract class NestedQueries
+{
+ private static final String OUTER_QUERY = "UNWIND range(1, 10000) AS x RETURN x";
+ private static final String INNER_QUERY = "UNWIND range(1, 10) AS y RETURN y";
+ private static final int EXPECTED_RECORDS = 10_000 * 10 + 10_000;
+
+ protected abstract Session newSession( AccessMode mode );
+
+ @Test
+ public void shouldAllowNestedQueriesInTransactionConsumedAsIterators() throws Exception
+ {
+ try ( Session session = newSession( AccessMode.READ ); Transaction tx = session.beginTransaction() )
+ {
+ testNestedQueriesConsumedAsIterators( tx );
+ tx.success();
+ }
+ }
+
+ @Test
+ public void shouldAllowNestedQueriesInTransactionConsumedAsLists() throws Exception
+ {
+ try ( Session session = newSession( AccessMode.READ ); Transaction tx = session.beginTransaction() )
+ {
+ testNestedQueriesConsumedAsLists( tx );
+ tx.success();
+ }
+ }
+
+ @Test
+ public void shouldAllowNestedQueriesInTransactionConsumedAsIteratorAndList() throws Exception
+ {
+ try ( Session session = newSession( AccessMode.READ ); Transaction tx = session.beginTransaction() )
+ {
+ testNestedQueriesConsumedAsIteratorAndList( tx );
+ tx.success();
+ }
+ }
+
+ @Test
+ public void shouldAllowNestedQueriesInSessionConsumedAsIterators() throws Exception
+ {
+ try ( Session session = newSession( AccessMode.READ ) )
+ {
+ testNestedQueriesConsumedAsIterators( session );
+ }
+ }
+
+ @Test
+ public void shouldAllowNestedQueriesInSessionConsumedAsLists() throws Exception
+ {
+ try ( Session session = newSession( AccessMode.READ ) )
+ {
+ testNestedQueriesConsumedAsLists( session );
+ }
+ }
+
+ @Test
+ public void shouldAllowNestedQueriesInSessionConsumedAsIteratorAndList() throws Exception
+ {
+ try ( Session session = newSession( AccessMode.READ ) )
+ {
+ testNestedQueriesConsumedAsIteratorAndList( session );
+ }
+ }
+
+ private void testNestedQueriesConsumedAsIterators( StatementRunner statementRunner ) throws Exception
+ {
+ int recordsSeen = 0;
+
+ StatementResult result1 = statementRunner.run( OUTER_QUERY );
+ Thread.sleep( 1000 ); // allow some result records to arrive and be buffered
+
+ while ( result1.hasNext() )
+ {
+ Record record1 = result1.next();
+ assertFalse( record1.get( "x" ).isNull() );
+ recordsSeen++;
+
+ StatementResult result2 = statementRunner.run( INNER_QUERY );
+ while ( result2.hasNext() )
+ {
+ Record record2 = result2.next();
+ assertFalse( record2.get( "y" ).isNull() );
+ recordsSeen++;
+ }
+ }
+
+ assertEquals( EXPECTED_RECORDS, recordsSeen );
+ }
+
+ private void testNestedQueriesConsumedAsLists( StatementRunner statementRunner ) throws Exception
+ {
+ int recordsSeen = 0;
+
+ StatementResult result1 = statementRunner.run( OUTER_QUERY );
+ Thread.sleep( 1000 ); // allow some result records to arrive and be buffered
+
+ List