Skip to content

Make channel auto-read management to work on routing driver #548

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Dec 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.neo4j.driver.internal.handlers.AckFailureResponseHandler;
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
import org.neo4j.driver.internal.messaging.MessageHandler;
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.v1.Logger;
Expand All @@ -49,7 +48,7 @@ public class InboundMessageDispatcher implements MessageHandler
private boolean fatalErrorOccurred;
private boolean ackFailureMuted;

private AutoReadManagingResponseHandler autoReadManagingHandler;
private ResponseHandler autoReadManagingHandler;

public InboundMessageDispatcher( Channel channel, Logging logging )
{
Expand Down Expand Up @@ -248,7 +247,7 @@ public boolean isAckFailureMuted()
/**
* <b>Visible for testing</b>
*/
AutoReadManagingResponseHandler autoReadManagingHandler()
ResponseHandler autoReadManagingHandler()
{
return autoReadManagingHandler;
}
Expand Down Expand Up @@ -276,13 +275,13 @@ private ResponseHandler removeHandler()

private void updateAutoReadManagingHandlerIfNeeded( ResponseHandler handler )
{
if ( handler instanceof AutoReadManagingResponseHandler )
if ( handler.canManageAutoRead() )
{
updateAutoReadManagingHandler( (AutoReadManagingResponseHandler) handler );
updateAutoReadManagingHandler( handler );
}
}

private void updateAutoReadManagingHandler( AutoReadManagingResponseHandler newHandler )
private void updateAutoReadManagingHandler( ResponseHandler newHandler )
{
if ( autoReadManagingHandler != null )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.util.MetadataUtil;
Expand All @@ -44,7 +44,7 @@
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.failedFuture;

public abstract class PullAllResponseHandler implements AutoReadManagingResponseHandler
public abstract class PullAllResponseHandler implements ResponseHandler
{
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();

Expand Down Expand Up @@ -130,6 +130,12 @@ public synchronized void onRecord( Value[] fields )
}
}

@Override
public boolean canManageAutoRead()
{
return true;
}

@Override
public synchronized void disableAutoReadManagement()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ public void onRecord( Value[] fields )
delegate.onRecord( fields );
}

@Override
public boolean canManageAutoRead()
{
return delegate.canManageAutoRead();
}

@Override
public void disableAutoReadManagement()
{
delegate.disableAutoReadManagement();
}

private Throwable handledError( Throwable receivedError )
{
Throwable error = Futures.completionExceptionCause( receivedError );
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.v1.Value;

public interface ResponseHandler
Expand All @@ -29,4 +30,27 @@ public interface ResponseHandler
void onFailure( Throwable error );

void onRecord( Value[] fields );

/**
* Tells whether this response handler is able to manage auto-read of the underlying connection using {@link Connection#enableAutoRead()} and
* {@link Connection#disableAutoRead()}.
* <p>
* Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
* There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
* racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
* a single auto-read managing handler per connection.
*/
default boolean canManageAutoRead()
{
return false;
}

/**
* If this response handler is able to manage auto-read of the underlying connection, then this method signals it to
* stop changing auto-read setting for the connection.
*/
default void disableAutoReadManagement()
{

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.HashMap;
import java.util.Map;

import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.value.IntegerValue;
import org.neo4j.driver.v1.Value;
Expand All @@ -49,9 +48,9 @@
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
Expand Down Expand Up @@ -289,7 +288,7 @@ public void shouldFailHandlerOnIgnoredMessageWithExistingError()

dispatcher.handleFailureMessage( FAILURE_CODE, FAILURE_MESSAGE );
verifyFailure( handler1 );
verifyZeroInteractions( handler2 );
verify( handler2, only() ).canManageAutoRead();

dispatcher.handleIgnoredMessage();
verifyFailure( handler2 );
Expand Down Expand Up @@ -452,9 +451,9 @@ public void shouldKeepSingleAutoReadManagingHandler()
{
InboundMessageDispatcher dispatcher = newDispatcher();

AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
ResponseHandler handler1 = newAutoReadManagingResponseHandler();
ResponseHandler handler2 = newAutoReadManagingResponseHandler();
ResponseHandler handler3 = newAutoReadManagingResponseHandler();

dispatcher.enqueue( handler1 );
dispatcher.enqueue( handler2 );
Expand All @@ -471,8 +470,8 @@ public void shouldKeepTrackOfAutoReadManagingHandler()
{
InboundMessageDispatcher dispatcher = newDispatcher();

AutoReadManagingResponseHandler handler1 = mock( AutoReadManagingResponseHandler.class );
AutoReadManagingResponseHandler handler2 = mock( AutoReadManagingResponseHandler.class );
ResponseHandler handler1 = newAutoReadManagingResponseHandler();
ResponseHandler handler2 = newAutoReadManagingResponseHandler();

assertNull( dispatcher.autoReadManagingHandler() );

Expand All @@ -490,7 +489,7 @@ public void shouldForgetAutoReadManagingHandlerWhenItIsRemoved()

ResponseHandler handler1 = mock( ResponseHandler.class );
ResponseHandler handler2 = mock( ResponseHandler.class );
AutoReadManagingResponseHandler handler3 = mock( AutoReadManagingResponseHandler.class );
ResponseHandler handler3 = newAutoReadManagingResponseHandler();

dispatcher.enqueue( handler1 );
dispatcher.enqueue( handler2 );
Expand All @@ -510,7 +509,7 @@ public void shouldReEnableAutoReadWhenAutoReadManagingHandlerIsRemoved()
Channel channel = newChannelMock();
InboundMessageDispatcher dispatcher = newDispatcher( channel );

AutoReadManagingResponseHandler handler = mock( AutoReadManagingResponseHandler.class );
ResponseHandler handler = newAutoReadManagingResponseHandler();
dispatcher.enqueue( handler );
assertEquals( handler, dispatcher.autoReadManagingHandler() );
verify( handler, never() ).disableAutoReadManagement();
Expand Down Expand Up @@ -548,4 +547,11 @@ private static Channel newChannelMock()
when( channel.config() ).thenReturn( channelConfig );
return channel;
}

private static ResponseHandler newAutoReadManagingResponseHandler()
{
ResponseHandler handler = mock( ResponseHandler.class );
when( handler.canManageAutoRead() ).thenReturn( true );
return handler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async;
package org.neo4j.driver.internal.handlers;

import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.util.concurrent.CompletionException;

import org.neo4j.driver.internal.RoutingErrorHandler;
import org.neo4j.driver.internal.handlers.RoutingResponseHandler;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.exceptions.ClientException;
Expand Down Expand Up @@ -126,6 +125,30 @@ public void shouldHandleClientException()
verifyZeroInteractions( errorHandler );
}

@Test
public void shouldDelegateCanManageAutoRead()
{
ResponseHandler responseHandler = mock( ResponseHandler.class );
RoutingResponseHandler routingResponseHandler =
new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, AccessMode.READ, null );

routingResponseHandler.canManageAutoRead();

verify( responseHandler ).canManageAutoRead();
}

@Test
public void shouldDelegateDisableAutoReadManagement()
{
ResponseHandler responseHandler = mock( ResponseHandler.class );
RoutingResponseHandler routingResponseHandler =
new RoutingResponseHandler( responseHandler, LOCAL_DEFAULT, AccessMode.READ, null );

routingResponseHandler.disableAutoReadManagement();

verify( responseHandler ).disableAutoReadManagement();
}

private void testWriteFailureWithReadAccessMode( String code )
{
ClientException error = new ClientException( code, "Hi" );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,35 @@
import static org.neo4j.driver.v1.util.TestUtil.await;
import static org.neo4j.driver.v1.util.TestUtil.awaitAllFutures;

public class CausalClusteringIT
public class CausalClusteringIT extends NestedQueries
{
private static final long DEFAULT_TIMEOUT_MS = 120_000;

@Rule
public final ClusterRule clusterRule = new ClusterRule();

private ExecutorService executor;
private Driver driver;

@Override
protected Session newSession( AccessMode mode )
{
if ( driver == null )
{
driver = createDriver( clusterRule.getCluster().leader().getRoutingUri() );
}

return driver.session( mode );
}

@After
public void tearDown()
{
if ( driver != null )
{
driver.close();
}

if ( executor != null )
{
executor.shutdownNow();
Expand Down
Loading