diff --git a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java similarity index 52% rename from driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java rename to driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java index 7010a73747..fd6b171126 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java @@ -19,51 +19,39 @@ package org.neo4j.driver.internal; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Logging; -import org.neo4j.driver.v1.Session; -import static java.lang.String.format; - -public class DirectDriver extends BaseDriver +/** + * Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for + * the given address. + */ +public class DirectConnectionProvider implements ConnectionProvider { private final BoltServerAddress address; - protected final ConnectionPool connections; + private final ConnectionPool pool; - public DirectDriver( - BoltServerAddress address, - ConnectionPool connections, - SecurityPlan securityPlan, - SessionFactory sessionFactory, - Logging logging ) + DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool ) { - super( securityPlan, sessionFactory, logging ); this.address = address; - this.connections = connections; + this.pool = pool; } @Override - protected Session newSessionWithMode( AccessMode mode ) + public PooledConnection acquireConnection( AccessMode mode ) { - return sessionFactory.newInstance( connections.acquire( address ) ); + return pool.acquire( address ); } @Override - protected void closeResources() + public void close() throws Exception { - try - { - connections.close(); - } - catch ( Exception ex ) - { - log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); - } + pool.close(); } - BoltServerAddress server() + public BoltServerAddress getAddress() { return address; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java index dbd7e18d73..eeec1a015e 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java @@ -22,6 +22,7 @@ import java.net.URI; import java.security.GeneralSecurityException; +import org.neo4j.driver.internal.cluster.LoadBalancer; import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.net.SocketConnector; @@ -29,6 +30,7 @@ import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.Connector; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.AuthToken; @@ -50,13 +52,10 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r BoltServerAddress address = BoltServerAddress.from( uri ); SecurityPlan securityPlan = createSecurityPlan( address, config ); ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config ); - SessionFactory sessionFactory = createSessionFactory( config ); try { - return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan, - sessionFactory - ); + return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan ); } catch ( Throwable driverError ) { @@ -74,42 +73,68 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r } private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, - SessionFactory sessionFactory ) + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan ) { switch ( scheme.toLowerCase() ) { case "bolt": - return createDirectDriver( address, connectionPool, config, securityPlan, sessionFactory ); + return createDirectDriver( address, connectionPool, config, securityPlan ); case "bolt+routing": - return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, - sessionFactory ); + return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } } /** - * Creates new {@link DirectDriver}. + * Creates a new driver for "bolt" scheme. *

* This method is protected only for testing */ - protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, + SecurityPlan securityPlan ) { - return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() ); + ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool ); + SessionFactory sessionFactory = createSessionFactory( connectionProvider, config ); + return createDriver( config, securityPlan, sessionFactory ); } /** - * Creates new {@link RoutingDriver}. + * Creates new a new driver for "bolt+routing" scheme. *

* This method is protected only for testing */ - protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, + Config config, RoutingSettings routingSettings, SecurityPlan securityPlan ) { - return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory, - createClock(), config.logging() ); + if ( !securityPlan.isRoutingCompatible() ) + { + throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" ); + } + ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings ); + SessionFactory sessionFactory = createSessionFactory( connectionProvider, config ); + return createDriver( config, securityPlan, sessionFactory ); + } + + /** + * Creates new {@link Driver}. + *

+ * This method is protected only for testing + */ + protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) + { + return new InternalDriver( securityPlan, sessionFactory, config.logging() ); + } + + /** + * Creates new {@link LoadBalancer} for the routing driver. + *

+ * This method is protected only for testing + */ + protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config, + RoutingSettings routingSettings ) + { + return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address ); } /** @@ -150,13 +175,14 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu return new SocketConnector( connectionSettings, securityPlan, logging ); } - private static SessionFactory createSessionFactory( Config config ) + /** + * Creates new {@link SessionFactory}. + *

+ * This method is protected only for testing + */ + protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config ) { - if ( config.logLeakedSessions() ) - { - return new LeakLoggingNetworkSessionFactory( config.logging() ); - } - return new NetworkSessionFactory(); + return new SessionFactoryImpl( connectionProvider, config, config.logging() ); } private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java index 55e6902edd..0cf2ed13d4 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ExplicitTransaction.java @@ -65,21 +65,21 @@ private enum State ROLLED_BACK } - private final Runnable cleanup; + private final SessionResourcesHandler resourcesHandler; private final Connection conn; private String bookmark = null; private State state = State.ACTIVE; - public ExplicitTransaction( Connection conn, Runnable cleanup ) + public ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler ) { - this( conn, cleanup, null ); + this( conn, resourcesHandler, null ); } - ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark ) + ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler, String bookmark ) { this.conn = conn; - this.cleanup = cleanup; + this.resourcesHandler = resourcesHandler; runBeginStatement( conn, bookmark ); } @@ -139,7 +139,7 @@ else if ( state == State.MARKED_FAILED || state == State.ACTIVE ) } finally { - cleanup.run(); + resourcesHandler.onTransactionClosed( this ); } } @@ -185,13 +185,14 @@ public synchronized StatementResult run( Statement statement ) try { - InternalStatementResult cursor = new InternalStatementResult( conn, this, statement ); + InternalStatementResult result = + new InternalStatementResult( conn, SessionResourcesHandler.NO_OP, this, statement ); conn.run( statement.text(), statement.parameters().asMap( ofValue() ), - cursor.runResponseCollector() ); - conn.pullAll( cursor.pullAllResponseCollector() ); + result.runResponseCollector() ); + conn.pullAll( result.pullAllResponseCollector() ); conn.flush(); - return cursor; + return result; } catch ( Neo4jException e ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java similarity index 72% rename from driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java rename to driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java index 8035e965f4..0d17ae6370 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/BaseDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java @@ -27,17 +27,19 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; -abstract class BaseDriver implements Driver +import static java.lang.String.format; + +public class InternalDriver implements Driver { private final static String DRIVER_LOG_NAME = "Driver"; private final SecurityPlan securityPlan; - protected final SessionFactory sessionFactory; - protected final Logger log; + private final SessionFactory sessionFactory; + private final Logger log; private AtomicBoolean closed = new AtomicBoolean( false ); - BaseDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging ) + InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging ) { this.securityPlan = securityPlan; this.sessionFactory = sessionFactory; @@ -61,8 +63,8 @@ public final Session session() public final Session session( AccessMode mode ) { assertOpen(); - Session session = newSessionWithMode( mode ); - if( closed.get() ) + Session session = sessionFactory.newInstance( mode ); + if ( closed.get() ) { // the driver is already closed and we either 1. obtain this session from the old session pool // or 2. we obtain this session from a new session pool @@ -77,15 +79,35 @@ public final Session session( AccessMode mode ) @Override public final void close() { - if ( closed.compareAndSet(false, true) ) + if ( closed.compareAndSet( false, true ) ) { closeResources(); } } - protected abstract Session newSessionWithMode( AccessMode mode ); + /** + * Get the underlying session factory. + *

+ * This method is only for testing + * + * @return the session factory used by this driver. + */ + public final SessionFactory getSessionFactory() + { + return sessionFactory; + } - protected abstract void closeResources(); + private void closeResources() + { + try + { + sessionFactory.close(); + } + catch ( Exception ex ) + { + log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); + } + } private void assertOpen() { @@ -95,7 +117,7 @@ private void assertOpen() } } - private IllegalStateException driverCloseException() + private static RuntimeException driverCloseException() { return new IllegalStateException( "This driver instance has already been closed" ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java index 355c1ad170..c6701209bc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/InternalStatementResult.java @@ -48,6 +48,7 @@ public class InternalStatementResult implements StatementResult { private final Connection connection; + private final SessionResourcesHandler resourcesHandler; private final Collector runResponseCollector; private final Collector pullAllResponseCollector; private final Queue recordBuffer = new LinkedList<>(); @@ -55,14 +56,15 @@ public class InternalStatementResult implements StatementResult private List keys = null; private ResultSummary summary = null; - private long position = -1; private boolean done = false; - InternalStatementResult( Connection connection, ExplicitTransaction transaction, Statement statement ) + InternalStatementResult( Connection connection, SessionResourcesHandler resourcesHandler, + ExplicitTransaction transaction, Statement statement ) { this.connection = connection; this.runResponseCollector = newRunResponseCollector(); this.pullAllResponseCollector = newStreamResponseCollector( transaction, statement, connection.server() ); + this.resourcesHandler = resourcesHandler; } private Collector newRunResponseCollector() @@ -202,7 +204,6 @@ public Record next() // and have it copy out its fields from some lower level data structure. if ( tryFetchNext() ) { - position += 1; return recordBuffer.poll(); } else @@ -287,7 +288,7 @@ public ResultSummary consume() { do { - connection.receiveOne(); + receiveOne(); recordBuffer.clear(); } while ( !done ); @@ -301,7 +302,7 @@ public ResultSummary summary() { while( !done ) { - connection.receiveOne(); + receiveOne(); } return summary; @@ -321,9 +322,26 @@ private boolean tryFetchNext() { return false; } - connection.receiveOne(); + receiveOne(); } return true; } + + private void receiveOne() + { + try + { + connection.receiveOne(); + } + catch ( Throwable error ) + { + resourcesHandler.onResultConsumed(); + throw error; + } + if ( done ) + { + resourcesHandler.onResultConsumed(); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java index 158c0b83c6..f3c5cab23f 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSession.java @@ -18,20 +18,19 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.PooledConnection; -import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Logging; import static java.lang.System.lineSeparator; class LeakLoggingNetworkSession extends NetworkSession { - private final Logger log; private final String stackTrace; - LeakLoggingNetworkSession( PooledConnection connection, Logger log ) + LeakLoggingNetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging ) { - super( connection ); - this.log = log; + super( connectionProvider, mode, logging ); this.stackTrace = captureStackTrace(); } @@ -44,11 +43,11 @@ protected void finalize() throws Throwable private void logLeakIfNeeded() { - if ( isOpen() ) + if ( currentConnectionIsOpen() ) { - log.error( "Neo4j Session object leaked, please ensure that your application" + - "calls the `close` method on Sessions before disposing of the objects.\n" + - "Session was create at:\n" + stackTrace, null ); + logger.error( "Neo4j Session object leaked, please ensure that your application" + + "calls the `close` method on Sessions before disposing of the objects.\n" + + "Session was create at:\n" + stackTrace, null ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java deleted file mode 100644 index b7c8feeb42..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.neo4j.driver.internal.spi.PooledConnection; -import org.neo4j.driver.v1.Logger; -import org.neo4j.driver.v1.Logging; -import org.neo4j.driver.v1.Session; - -class LeakLoggingNetworkSessionFactory implements SessionFactory -{ - private static final String LOGGER_NAME = "sessionLeak"; - - private final Logger logger; - - LeakLoggingNetworkSessionFactory( Logging logging ) - { - this.logger = logging.getLog( LOGGER_NAME ); - } - - @Override - public Session newInstance( PooledConnection connection ) - { - return new LeakLoggingNetworkSession( connection, logger ); - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index a02e49f69b..238afced36 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -19,14 +19,15 @@ package org.neo4j.driver.internal; import java.util.Map; -import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.driver.internal.logging.DevNullLogger; import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.types.InternalTypeSystem; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.Statement; @@ -35,53 +36,27 @@ import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.Values; import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.types.TypeSystem; import static org.neo4j.driver.v1.Values.value; -public class NetworkSession implements Session +public class NetworkSession implements Session, SessionResourcesHandler { - private final PooledConnection connection; - private final String sessionId; - private final Logger logger; - - private String lastBookmark = null; - - // Called when a transaction object is closed - private final Runnable txCleanup = new Runnable() - { - @Override - public void run() - { - synchronized ( NetworkSession.this ) - { - if ( currentTransaction != null ) - { - lastBookmark = currentTransaction.bookmark(); - currentTransaction = null; - } - } - } - }; + private final ConnectionProvider connectionProvider; + private final AccessMode mode; + protected final Logger logger; + private String lastBookmark; + private PooledConnection currentConnection; private ExplicitTransaction currentTransaction; - private AtomicBoolean isOpen = new AtomicBoolean( true ); - public NetworkSession( PooledConnection connection ) - { - this.connection = connection; + private final AtomicBoolean isOpen = new AtomicBoolean( true ); - if( connection != null && connection.logger() != null ) - { - this.logger = connection.logger(); - } - else - { - this.logger = DevNullLogger.DEV_NULL_LOGGER; - } - sessionId = UUID.randomUUID().toString(); - this.logger.debug( "~~ connection claimed by [session-%s]", sessionId ); + public NetworkSession( ConnectionProvider connectionProvider, AccessMode mode, Logging logging ) + { + this.connectionProvider = connectionProvider; + this.mode = mode; + this.logger = logging.getLog( "Session-" + hashCode() ); } @Override @@ -113,18 +88,23 @@ public StatementResult run( String statementText, Value statementParameters ) @Override public StatementResult run( Statement statement ) { - ensureConnectionIsValidBeforeRunningSession(); - return run( connection, statement ); + ensureSessionIsOpen(); + ensureNoOpenTransactionBeforeRunningSession(); + + syncAndCloseCurrentConnection(); + currentConnection = acquireConnection(); + + return run( currentConnection, statement, this ); } - public static StatementResult run( Connection connection, Statement statement ) + public static StatementResult run( Connection connection, Statement statement, SessionResourcesHandler resourcesHandler ) { - InternalStatementResult cursor = new InternalStatementResult( connection, null, statement ); + InternalStatementResult result = new InternalStatementResult( connection, resourcesHandler, null, statement ); connection.run( statement.text(), statement.parameters().asMap( Values.ofValue() ), - cursor.runResponseCollector() ); - connection.pullAll( cursor.pullAllResponseCollector() ); + result.runResponseCollector() ); + connection.pullAll( result.pullAllResponseCollector() ); connection.flush(); - return cursor; + return result; } @Override @@ -132,7 +112,6 @@ public synchronized void reset() { ensureSessionIsOpen(); ensureNoUnrecoverableError(); - ensureConnectionIsOpen(); if ( currentTransaction != null ) { @@ -140,13 +119,16 @@ public synchronized void reset() lastBookmark = currentTransaction.bookmark(); currentTransaction = null; } - connection.resetAsync(); + if ( currentConnection != null ) + { + currentConnection.resetAsync(); + } } @Override public boolean isOpen() { - return isOpen.get() && connection.isOpen(); + return isOpen.get(); } @Override @@ -158,13 +140,6 @@ public void close() throw new ClientException( "This session has already been closed." ); } - if ( !connection.isOpen() ) - { - // the socket connection is already closed due to some error, cannot send more data - closeConnection(); - return; - } - synchronized ( this ) { if ( currentTransaction != null ) @@ -180,20 +155,8 @@ public void close() } } } - try - { - connection.sync(); - } - finally - { - closeConnection(); - } - } - - private void closeConnection() - { - logger.debug( "~~ connection released by [session-%s]", sessionId ); - connection.close(); + + syncAndCloseCurrentConnection(); } @Override @@ -205,27 +168,14 @@ public Transaction beginTransaction() @Override public synchronized Transaction beginTransaction( String bookmark ) { - ensureConnectionIsValidBeforeOpeningTransaction(); - currentTransaction = new ExplicitTransaction( connection, txCleanup, bookmark ); - connection.onError( new Runnable() - { - @Override - public void run() - { - // must check if transaction has been closed - if ( currentTransaction != null ) - { - if ( connection.hasUnrecoverableErrors() ) - { - currentTransaction.markToClose(); - } - else - { - currentTransaction.failure(); - } - } - } - } ); + ensureSessionIsOpen(); + ensureNoOpenTransactionBeforeOpeningTransaction(); + + syncAndCloseCurrentConnection(); + currentConnection = acquireConnection(); + + currentTransaction = new ExplicitTransaction( currentConnection, this, bookmark ); + currentConnection.setResourcesHandler( this ); return currentTransaction; } @@ -241,25 +191,43 @@ public TypeSystem typeSystem() return InternalTypeSystem.TYPE_SYSTEM; } - private void ensureConnectionIsValidBeforeRunningSession() + @Override + public synchronized void onResultConsumed() { - ensureSessionIsOpen(); - ensureNoUnrecoverableError(); - ensureNoOpenTransactionBeforeRunningSession(); - ensureConnectionIsOpen(); + closeCurrentConnection(); } - private void ensureConnectionIsValidBeforeOpeningTransaction() + @Override + public synchronized void onTransactionClosed( ExplicitTransaction tx ) { - ensureSessionIsOpen(); - ensureNoUnrecoverableError(); - ensureNoOpenTransactionBeforeOpeningTransaction(); - ensureConnectionIsOpen(); + if ( currentTransaction != null && currentTransaction == tx ) + { + closeCurrentConnection(); + lastBookmark = currentTransaction.bookmark(); + currentTransaction = null; + } + } + + @Override + public synchronized void onConnectionError( boolean recoverable ) + { + // must check if transaction has been closed + if ( currentTransaction != null ) + { + if ( recoverable ) + { + currentTransaction.failure(); + } + else + { + currentTransaction.markToClose(); + } + } } private void ensureNoUnrecoverableError() { - if ( connection.hasUnrecoverableErrors() ) + if ( currentConnection != null && currentConnection.hasUnrecoverableErrors() ) { throw new ClientException( "Cannot run more statements in the current session as an unrecoverable error " + "has happened. Please close the current session and re-run your statement in a" + @@ -287,16 +255,6 @@ private void ensureNoOpenTransactionBeforeOpeningTransaction() } } - private void ensureConnectionIsOpen() - { - if ( !connection.isOpen() ) - { - throw new ServiceUnavailableException( "The current session cannot be reused as the underlying connection with the " + - "server has been closed due to unrecoverable errors. " + - "Please close this session and retry your statement in another new session." ); - } - } - private void ensureSessionIsOpen() { if ( !isOpen.get() ) @@ -309,4 +267,49 @@ private void ensureSessionIsOpen() "or you are trying to reuse a session that you have called `reset` on it." ); } } + + private PooledConnection acquireConnection() + { + PooledConnection connection = connectionProvider.acquireConnection( mode ); + logger.debug( "Acquired connection " + connection.hashCode() ); + return connection; + } + + boolean currentConnectionIsOpen() + { + return currentConnection != null && currentConnection.isOpen(); + } + + private void syncAndCloseCurrentConnection() + { + closeCurrentConnection( true ); + } + + private void closeCurrentConnection() + { + closeCurrentConnection( false ); + } + + private void closeCurrentConnection( boolean sync ) + { + if ( currentConnection == null ) + { + return; + } + + PooledConnection connection = currentConnection; + currentConnection = null; + try + { + if ( sync && connection.isOpen() ) + { + connection.sync(); + } + } + finally + { + connection.close(); + logger.debug( "Released connection " + connection.hashCode() ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java deleted file mode 100644 index 306ac10c03..0000000000 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.neo4j.driver.internal.cluster.LoadBalancer; -import org.neo4j.driver.internal.cluster.RoutingSettings; -import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.internal.security.SecurityPlan; -import org.neo4j.driver.internal.spi.ConnectionPool; -import org.neo4j.driver.internal.spi.PooledConnection; -import org.neo4j.driver.internal.util.Clock; -import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Logging; -import org.neo4j.driver.v1.Session; -import org.neo4j.driver.v1.exceptions.ClientException; - -import static java.lang.String.format; - -public class RoutingDriver extends BaseDriver -{ - // Verify that a security plan is compatible with this driver, throwing an exception if not - private static SecurityPlan verifiedSecurityPlan( SecurityPlan securityPlan ) - { - if ( !securityPlan.isRoutingCompatible() ) - { - throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" ); - } - return securityPlan; - } - - private final LoadBalancer loadBalancer; - - public RoutingDriver( - RoutingSettings settings, - BoltServerAddress seedAddress, - ConnectionPool connections, - SecurityPlan securityPlan, - SessionFactory sessionFactory, - Clock clock, - Logging logging ) - { - super( verifiedSecurityPlan( securityPlan ), sessionFactory, logging ); - this.loadBalancer = new LoadBalancer( settings, clock, log, connections, seedAddress ); - } - - @Override - protected Session newSessionWithMode( AccessMode mode ) - { - PooledConnection connection = acquireConnection( mode ); - return sessionFactory.newInstance( connection ); - } - - private PooledConnection acquireConnection( AccessMode role ) - { - switch ( role ) - { - case READ: - return loadBalancer.acquireReadConnection(); - case WRITE: - return loadBalancer.acquireWriteConnection(); - default: - throw new ClientException( role + " is not supported for creating new sessions" ); - } - } - - @Override - protected void closeResources() - { - try - { - loadBalancer.close(); - } - catch ( Exception ex ) - { - log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex ); - } - } -} diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java index 39039fbc8a..39c0f6ac66 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java @@ -18,10 +18,10 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Session; -interface SessionFactory +public interface SessionFactory extends AutoCloseable { - Session newInstance( PooledConnection connection ); + Session newInstance( AccessMode mode ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java new file mode 100644 index 0000000000..7da6cc74a8 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Logging; +import org.neo4j.driver.v1.Session; + +public class SessionFactoryImpl implements SessionFactory +{ + protected final ConnectionProvider connectionProvider; + protected final Logging logging; + protected final boolean leakedSessionsLoggingEnabled; + + SessionFactoryImpl( ConnectionProvider connectionProvider, Config config, Logging logging ) + { + this.connectionProvider = connectionProvider; + this.leakedSessionsLoggingEnabled = config.logLeakedSessions(); + this.logging = logging; + } + + @Override + public Session newInstance( AccessMode mode ) + { + if ( leakedSessionsLoggingEnabled ) + { + return new LeakLoggingNetworkSession( connectionProvider, mode, logging ); + } + return new NetworkSession( connectionProvider, mode, logging ); + } + + @Override + public void close() throws Exception + { + connectionProvider.close(); + } + + /** + * Get the underlying connection provider. + *

+ * This method is only for testing + * + * @return the connection provider used by this factory. + */ + public ConnectionProvider getConnectionProvider() + { + return connectionProvider; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java b/driver/src/main/java/org/neo4j/driver/internal/SessionResourcesHandler.java similarity index 58% rename from driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java rename to driver/src/main/java/org/neo4j/driver/internal/SessionResourcesHandler.java index a1fb63927a..486022defb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSessionFactory.java +++ b/driver/src/main/java/org/neo4j/driver/internal/SessionResourcesHandler.java @@ -18,14 +18,29 @@ */ package org.neo4j.driver.internal; -import org.neo4j.driver.internal.spi.PooledConnection; -import org.neo4j.driver.v1.Session; - -class NetworkSessionFactory implements SessionFactory +public interface SessionResourcesHandler { - @Override - public Session newInstance( PooledConnection connection ) + void onResultConsumed(); + + void onTransactionClosed( ExplicitTransaction tx ); + + void onConnectionError( boolean recoverable ); + + SessionResourcesHandler NO_OP = new SessionResourcesHandler() { - return new NetworkSession( connection ); - } + @Override + public void onResultConsumed() + { + } + + @Override + public void onTransactionClosed( ExplicitTransaction tx ) + { + } + + @Override + public void onConnectionError( boolean recoverable ) + { + } + }; } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java index 244fedc14f..06230b6943 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java @@ -22,16 +22,17 @@ import java.util.List; import org.neo4j.driver.internal.NetworkSession; +import org.neo4j.driver.internal.SessionResourcesHandler; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Statement; public class GetServersProcedureRunner { - private final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers"; + private static final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers"; public List run( Connection connection ) { - return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ) ).list(); + return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ), SessionResourcesHandler.NO_OP ).list(); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index d3440555bf..dae90a61b1 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -23,47 +23,41 @@ import org.neo4j.driver.internal.RoutingErrorHandler; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import static java.lang.String.format; -public class LoadBalancer implements RoutingErrorHandler, AutoCloseable +public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable { - private final Logger log; + private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer"; private final ConnectionPool connections; private final RoutingTable routingTable; private final Rediscovery rediscovery; + private final Logger log; - public LoadBalancer( - RoutingSettings settings, - Clock clock, - Logger log, - ConnectionPool connections, - BoltServerAddress... routingAddresses ) throws ServiceUnavailableException + public LoadBalancer( RoutingSettings settings, ConnectionPool connections, Clock clock, Logging logging, + BoltServerAddress... routingAddresses ) { - this( settings, clock, log, connections, new ClusterRoutingTable( clock, routingAddresses ), - new GetServersProcedureClusterCompositionProvider( clock, log ) ); + this( settings, new ClusterRoutingTable( clock, routingAddresses ), connections, clock, + logging.getLog( LOAD_BALANCER_LOG_NAME ) ); } - private LoadBalancer( - RoutingSettings settings, - Clock clock, - Logger log, - ConnectionPool connections, - RoutingTable routingTable, - ClusterCompositionProvider provider ) throws ServiceUnavailableException + private LoadBalancer( RoutingSettings settings, RoutingTable routingTable, ConnectionPool connections, + Clock clock, Logger log ) { - this( routingTable, connections, new Rediscovery( settings, clock, log, provider ), log ); + this( routingTable, connections, new Rediscovery( settings, clock, log, + new GetServersProcedureClusterCompositionProvider( clock, log ) ), log ); } LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log ) - throws ServiceUnavailableException { this.log = log; this.connections = connections; @@ -74,16 +68,12 @@ private LoadBalancer( ensureRouting(); } - public PooledConnection acquireReadConnection() throws ServiceUnavailableException - { - PooledConnection connection = acquireConnection( routingTable.readers() ); - return new RoutingPooledConnection( connection, this, AccessMode.READ ); - } - - public PooledConnection acquireWriteConnection() throws ServiceUnavailableException + @Override + public PooledConnection acquireConnection( AccessMode mode ) { - PooledConnection connection = acquireConnection( routingTable.writers() ); - return new RoutingPooledConnection( connection, this, AccessMode.WRITE ); + RoundRobinAddressSet addressSet = addressSetFor( mode ); + PooledConnection connection = acquireConnection( addressSet ); + return new RoutingPooledConnection( connection, this, mode ); } @Override @@ -161,5 +151,16 @@ synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolEx } } - + private RoundRobinAddressSet addressSetFor( AccessMode mode ) + { + switch ( mode ) + { + case READ: + return routingTable.readers(); + case WRITE: + return routingTable.writers(); + default: + throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + } + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java index a61afe7d3e..7f7dd9e013 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java @@ -22,11 +22,11 @@ import java.util.Objects; import org.neo4j.driver.internal.RoutingErrorHandler; +import org.neo4j.driver.internal.SessionResourcesHandler; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -191,9 +191,9 @@ public boolean isOpen() } @Override - public void onError( Runnable runnable ) + public void setResourcesHandler( SessionResourcesHandler resourcesHandler ) { - delegate.onError( runnable ); + delegate.setResourcesHandler( resourcesHandler ); } @Override @@ -220,12 +220,6 @@ public BoltServerAddress boltServerAddress() return delegate.boltServerAddress(); } - @Override - public Logger logger() - { - return delegate.logger(); - } - @Override public long lastUsedTimestamp() { diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java index 0460c63109..13dcc21a16 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java @@ -23,7 +23,6 @@ import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.summary.ServerInfo; @@ -224,10 +223,4 @@ public BoltServerAddress boltServerAddress() { return delegate.boltServerAddress(); } - - @Override - public Logger logger() - { - return delegate.logger(); - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java index 357367222a..5bd390a4ad 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java @@ -23,7 +23,6 @@ import java.util.LinkedList; import java.util.Map; import java.util.Queue; -import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.neo4j.driver.internal.messaging.InitMessage; @@ -41,7 +40,6 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.summary.ServerInfo; -import static java.lang.String.format; import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE; import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL; import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL; @@ -57,17 +55,13 @@ public class SocketConnection implements Connection private final SocketClient socket; - private final Logger logger; - - public SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging ) + SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging ) { - this( address, securityPlan, timeoutMillis, - logging.getLog( format( "conn-%s", UUID.randomUUID().toString() ) ) ); - } + Logger logger = logging.getLog( "Connection-" + hashCode() ); + this.socket = new SocketClient( address, securityPlan, timeoutMillis, logger ); + this.responseHandler = createResponseHandler( logger ); - private SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger ) - { - this( new SocketClient( address, securityPlan, timeoutMillis, logger ), null, logger ); + startSocketClient(); } /** @@ -83,9 +77,13 @@ public SocketConnection( SocketClient socket, InternalServerInfo serverInfo, Log { this.socket = socket; this.serverInfo = serverInfo; - this.logger = logger; this.responseHandler = createResponseHandler( logger ); + startSocketClient(); + } + + private void startSocketClient() + { try { this.socket.start(); @@ -302,10 +300,4 @@ public BoltServerAddress boltServerAddress() { return this.serverInfo.boltServerAddress(); } - - @Override - public Logger logger() - { - return this.logger; - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java index 8f3c18b156..6acfa29113 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java @@ -20,13 +20,13 @@ import java.util.Map; +import org.neo4j.driver.internal.SessionResourcesHandler; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.Neo4jException; import org.neo4j.driver.v1.summary.ServerInfo; @@ -58,7 +58,7 @@ public class PooledSocketConnection implements PooledConnection private final Consumer release; private boolean unrecoverableErrorsOccurred = false; - private Runnable onError = null; + private SessionResourcesHandler resourcesHandler; private final Clock clock; private long lastUsedTimestamp; @@ -196,6 +196,7 @@ public void receiveOne() public void close() { updateLastUsedTimestamp(); + resourcesHandler = null; release.accept( this ); // put the full logic of deciding whether to dispose the connection or to put it back to // the pool into the release object @@ -244,12 +245,6 @@ public BoltServerAddress boltServerAddress() return delegate.boltServerAddress(); } - @Override - public Logger logger() - { - return delegate.logger(); - } - @Override public void dispose() { @@ -268,21 +263,21 @@ private void onDelegateException( RuntimeException e ) { unrecoverableErrorsOccurred = true; } - else if( !isAckFailureMuted() ) + else if ( !isAckFailureMuted() ) { ackFailure(); } - if( onError != null ) + if ( resourcesHandler != null ) { - onError.run(); + resourcesHandler.onConnectionError( !unrecoverableErrorsOccurred ); } throw e; } @Override - public void onError( Runnable runnable ) + public void setResourcesHandler( SessionResourcesHandler resourcesHandler ) { - this.onError = runnable; + this.resourcesHandler = resourcesHandler; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java index 20860e65d0..0901059631 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java @@ -21,7 +21,6 @@ import java.util.Map; import org.neo4j.driver.internal.net.BoltServerAddress; -import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.summary.ServerInfo; @@ -119,9 +118,4 @@ public interface Connection extends AutoCloseable * Returns the BoltServerAddress connected to */ BoltServerAddress boltServerAddress(); - - /** - * Returns the logger of this connection - */ - Logger logger(); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java similarity index 54% rename from driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java rename to driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java index 3607040b76..0b474b5ab3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionFactoryTest.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java @@ -16,26 +16,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.neo4j.driver.internal; +package org.neo4j.driver.internal.spi; -import org.junit.Test; +import org.neo4j.driver.v1.AccessMode; -import org.neo4j.driver.internal.spi.PooledConnection; -import org.neo4j.driver.v1.Session; - -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; - -public class NetworkSessionFactoryTest +/** + * Interface defines a layer used by the driver to obtain connections. It is meant to be the only component that + * differs between "direct" and "routing" driver. + */ +public interface ConnectionProvider extends AutoCloseable { - @Test - public void createsNetworkSessions() - { - SessionFactory factory = new NetworkSessionFactory(); - - Session session = factory.newInstance( mock( PooledConnection.class ) ); - - assertThat( session, instanceOf( NetworkSession.class ) ); - } + /** + * Acquire new {@link PooledConnection pooled connection} for the given {@link AccessMode mode}. + * + * @param mode the access mode for the connection. + * @return free or new pooled connection. + */ + PooledConnection acquireConnection( AccessMode mode ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java b/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java index 8bd931d50f..3e8af208c9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java +++ b/driver/src/main/java/org/neo4j/driver/internal/spi/PooledConnection.java @@ -18,18 +18,19 @@ */ package org.neo4j.driver.internal.spi; +import org.neo4j.driver.internal.SessionResourcesHandler; import org.neo4j.driver.internal.util.Clock; public interface PooledConnection extends Connection { /** - * If there are any errors that occur on this connection, invoke the given - * runnable. This is used in the driver to clean up resources associated with + * If there are any errors that occur on this connection, notify the given handler + * about them. This is used in the driver to clean up resources associated with * the connection, like an open transaction. * - * @param runnable To be run on error. + * @param resourcesHandler To be notified on error. */ - void onError( Runnable runnable ); + void setResourcesHandler( SessionResourcesHandler resourcesHandler ); /** * Check if this connection experienced any unrecoverable errors. Connections with unrecoverable errors should be diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java new file mode 100644 index 0000000000..ca479dfc87 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectConnectionProviderTest.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.Test; + +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.PooledConnection; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.only; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.v1.AccessMode.READ; +import static org.neo4j.driver.v1.AccessMode.WRITE; + +public class DirectConnectionProviderTest +{ + @Test + public void acquiresConnectionsFromThePool() + { + ConnectionPool pool = mock( ConnectionPool.class ); + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + when( pool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connection1 ).thenReturn( connection2 ); + + DirectConnectionProvider provider = newConnectionProvider( pool ); + + assertSame( connection1, provider.acquireConnection( READ ) ); + assertSame( connection2, provider.acquireConnection( WRITE ) ); + } + + @Test + public void closesPool() throws Exception + { + ConnectionPool pool = mock( ConnectionPool.class ); + DirectConnectionProvider provider = newConnectionProvider( pool ); + + provider.close(); + + verify( pool, only() ).close(); + } + + @Test + public void returnsCorrectAddress() + { + BoltServerAddress address = new BoltServerAddress( "server-1", 25000 ); + + DirectConnectionProvider provider = newConnectionProvider( address ); + + assertEquals( address, provider.getAddress() ); + } + + private static DirectConnectionProvider newConnectionProvider( BoltServerAddress address ) + { + return new DirectConnectionProvider( address, mock( ConnectionPool.class ) ); + } + + private static DirectConnectionProvider newConnectionProvider( ConnectionPool pool ) + { + return new DirectConnectionProvider( BoltServerAddress.LOCAL_DEFAULT, pool ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java index 96efb0d63f..230ee2f712 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DirectDriverTest.java @@ -29,8 +29,11 @@ import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.util.StubServer; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; +import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; +import static org.neo4j.driver.internal.util.Matchers.directDriverWithAddress; import static org.neo4j.driver.v1.Values.parameters; import static org.neo4j.driver.v1.util.StubServer.INSECURE_CONFIG; @@ -40,14 +43,13 @@ public class DirectDriverTest public void shouldUseDefaultPortIfMissing() { // Given - URI uri = URI.create( "bolt://localhost:7687" ); + URI uri = URI.create( "bolt://localhost" ); // When - DirectDriver driver = (DirectDriver) GraphDatabase.driver( uri ); + Driver driver = GraphDatabase.driver( uri ); // Then - BoltServerAddress address = driver.server(); - assertThat( address.port(), equalTo( BoltServerAddress.DEFAULT_PORT ) ); + assertThat( driver, is( directDriverWithAddress( LOCAL_DEFAULT ) ) ); } @Test @@ -58,12 +60,10 @@ public void shouldRegisterSingleServer() BoltServerAddress address = BoltServerAddress.from( uri ); // When - DirectDriver driver = (DirectDriver) GraphDatabase.driver( uri ); + Driver driver = GraphDatabase.driver( uri ); // Then - BoltServerAddress driverAddress = driver.server(); - assertThat( driverAddress, equalTo( address )); - + assertThat( driver, is( directDriverWithAddress( address ) ) ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java index 564d81e340..9cbc3ce130 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java @@ -28,13 +28,16 @@ import java.util.Arrays; import java.util.List; +import org.neo4j.driver.internal.cluster.LoadBalancer; import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.v1.AuthToken; import org.neo4j.driver.v1.AuthTokens; import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertArrayEquals; @@ -43,6 +46,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.Config.defaultConfig; @RunWith( Parameterized.class ) @@ -108,7 +112,7 @@ public void usesStandardSessionFactoryWhenNothingConfigured() factory.newInstance( uri, dummyAuthToken(), dummyRoutingSettings(), config ); - assertThat( factory.capturedSessionFactory, instanceOf( NetworkSessionFactory.class ) ); + assertThat( factory.capturedSessionFactory.newInstance( READ ), instanceOf( NetworkSession.class ) ); } @Test @@ -119,7 +123,7 @@ public void usesLeakLoggingSessionFactoryWhenConfigured() factory.newInstance( uri, dummyAuthToken(), dummyRoutingSettings(), config ); - assertThat( factory.capturedSessionFactory, instanceOf( LeakLoggingNetworkSessionFactory.class ) ); + assertThat( factory.capturedSessionFactory.newInstance( READ ), instanceOf( LeakLoggingNetworkSession.class ) ); } private static AuthToken dummyAuthToken() @@ -142,16 +146,15 @@ private static class ThrowingDriverFactory extends DriverFactory } @Override - protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, + SecurityPlan securityPlan ) { throw new UnsupportedOperationException( "Can't create direct driver" ); } @Override - protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, - SessionFactory sessionFactory ) + protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config, + RoutingSettings routingSettings, SecurityPlan securityPlan ) { throw new UnsupportedOperationException( "Can't create routing driver" ); } @@ -168,20 +171,24 @@ private static class SessionFactoryCapturingDriverFactory extends DriverFactory SessionFactory capturedSessionFactory; @Override - protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) + protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory ) { - capturedSessionFactory = sessionFactory; return null; } @Override - protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, - Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, - SessionFactory sessionFactory ) + protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, + Config config, RoutingSettings routingSettings ) { - capturedSessionFactory = sessionFactory; return null; } + + @Override + protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config ) + { + SessionFactory sessionFactory = super.createSessionFactory( connectionProvider, config ); + capturedSessionFactory = sessionFactory; + return sessionFactory; + } } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java index 13b199cf72..9b477bff3d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExplicitTransactionTest.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.only; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -45,8 +46,8 @@ public void shouldRollbackOnImplicitFailure() throws Throwable // Given Connection conn = mock( Connection.class ); when( conn.isOpen() ).thenReturn( true ); - Runnable cleanup = mock( Runnable.class ); - ExplicitTransaction tx = new ExplicitTransaction( conn, cleanup ); + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + ExplicitTransaction tx = new ExplicitTransaction( conn, resourcesHandler ); // When tx.close(); @@ -59,8 +60,8 @@ public void shouldRollbackOnImplicitFailure() throws Throwable order.verify( conn ).run( "ROLLBACK", Collections.emptyMap(), Collector.NO_OP ); order.verify( conn ).pullAll( any( Collector.class ) ); order.verify( conn ).sync(); - verify( cleanup ).run(); - verifyNoMoreInteractions( conn, cleanup ); + verify( resourcesHandler, only() ).onTransactionClosed( tx ); + verifyNoMoreInteractions( conn, resourcesHandler ); } @Test @@ -69,8 +70,8 @@ public void shouldRollbackOnExplicitFailure() throws Throwable // Given Connection conn = mock( Connection.class ); when( conn.isOpen() ).thenReturn( true ); - Runnable cleanup = mock( Runnable.class ); - ExplicitTransaction tx = new ExplicitTransaction( conn, cleanup ); + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + ExplicitTransaction tx = new ExplicitTransaction( conn, resourcesHandler ); // When tx.failure(); @@ -85,8 +86,8 @@ public void shouldRollbackOnExplicitFailure() throws Throwable order.verify( conn ).run( "ROLLBACK", Collections.emptyMap(), Collector.NO_OP ); order.verify( conn ).pullAll( any( BookmarkCollector.class ) ); order.verify( conn ).sync(); - verify( cleanup ).run(); - verifyNoMoreInteractions( conn, cleanup ); + verify( resourcesHandler, only() ).onTransactionClosed( tx ); + verifyNoMoreInteractions( conn, resourcesHandler ); } @Test @@ -95,8 +96,8 @@ public void shouldCommitOnSuccess() throws Throwable // Given Connection conn = mock( Connection.class ); when( conn.isOpen() ).thenReturn( true ); - Runnable cleanup = mock( Runnable.class ); - ExplicitTransaction tx = new ExplicitTransaction( conn, cleanup ); + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + ExplicitTransaction tx = new ExplicitTransaction( conn, resourcesHandler ); // When tx.success(); @@ -111,8 +112,8 @@ public void shouldCommitOnSuccess() throws Throwable order.verify( conn ).run( "COMMIT", Collections.emptyMap(), Collector.NO_OP ); order.verify( conn ).pullAll( any( BookmarkCollector.class ) ); order.verify( conn ).sync(); - verify( cleanup ).run(); - verifyNoMoreInteractions( conn, cleanup ); + verify( resourcesHandler, only() ).onTransactionClosed( tx ); + verifyNoMoreInteractions( conn, resourcesHandler ); } @Test @@ -120,7 +121,7 @@ public void shouldOnlyQueueMessagesWhenNoBookmarkGiven() { Connection connection = mock( Connection.class ); - new ExplicitTransaction( connection, mock( Runnable.class ), null ); + new ExplicitTransaction( connection, mock( SessionResourcesHandler.class ), null ); InOrder inOrder = inOrder( connection ); inOrder.verify( connection ).run( "BEGIN", Collections.emptyMap(), Collector.NO_OP ); @@ -134,7 +135,7 @@ public void shouldSyncWhenBookmarkGiven() String bookmark = "hi, I'm bookmark"; Connection connection = mock( Connection.class ); - new ExplicitTransaction( connection, mock( Runnable.class ), bookmark ); + new ExplicitTransaction( connection, mock( SessionResourcesHandler.class ), bookmark ); Map expectedParams = Collections.singletonMap( "bookmark", value( bookmark ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java b/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java index f57f37c738..4c99f888fe 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ExtractTest.java @@ -170,18 +170,19 @@ public void testFields() throws Exception Connection connection = mock( Connection.class ); String statement = ""; - InternalStatementResult cursor = new InternalStatementResult( connection, null, new Statement( statement ) ); - cursor.runResponseCollector().keys( new String[]{"k1"} ); - cursor.runResponseCollector().done(); - cursor.pullAllResponseCollector().record( new Value[]{value( 42 )} ); - cursor.pullAllResponseCollector().done(); - - connection.run( statement, Values.EmptyMap.asMap( ofValue() ), cursor.runResponseCollector() ); - connection.pullAll( cursor.pullAllResponseCollector() ); + InternalStatementResult result = new InternalStatementResult( connection, SessionResourcesHandler.NO_OP, null, + new Statement( statement ) ); + result.runResponseCollector().keys( new String[]{"k1"} ); + result.runResponseCollector().done(); + result.pullAllResponseCollector().record( new Value[]{value( 42 )} ); + result.pullAllResponseCollector().done(); + + connection.run( statement, Values.EmptyMap.asMap( ofValue() ), result.runResponseCollector() ); + connection.pullAll( result.pullAllResponseCollector() ); connection.flush(); // WHEN - List> fields = Extract.fields( cursor.single(), integerExtractor() ); + List> fields = Extract.fields( result.single(), integerExtractor() ); // THEN diff --git a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java index 17070f0c88..d8d9e9eff5 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/InternalStatementResultTest.java @@ -19,16 +19,16 @@ package org.neo4j.driver.internal; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.value.NullValue; import org.neo4j.driver.v1.Record; @@ -39,9 +39,9 @@ import org.neo4j.driver.v1.util.Pair; import static java.util.Arrays.asList; - import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.collection.IsCollectionWithSize.hasSize; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -50,7 +50,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; - +import static org.mockito.Mockito.verify; import static org.neo4j.driver.v1.Records.column; import static org.neo4j.driver.v1.Values.ofString; import static org.neo4j.driver.v1.Values.value; @@ -68,15 +68,15 @@ public void iterationShouldWorksAsExpected() // WHEN assertTrue( result.hasNext() ); - assertThat( values( result.next() ), equalTo( asList(value("v1-1"), value( "v2-1" )))); + assertThat( values( result.next() ), equalTo( asList( value( "v1-1" ), value( "v2-1" ) ) ) ); assertTrue( result.hasNext() ); - assertThat( values( result.next() ), equalTo( asList(value("v1-2"), value( "v2-2" )))); + assertThat( values( result.next() ), equalTo( asList( value( "v1-2" ), value( "v2-2" ) ) ) ); assertTrue( result.hasNext() ); //1 -> 2 // THEN - assertThat( values( result.next() ), equalTo( asList(value("v1-3"), value( "v2-3" )))); + assertThat( values( result.next() ), equalTo( asList( value( "v1-3" ), value( "v2-3" ) ) ) ); assertFalse( result.hasNext() ); expectedException.expect( NoSuchRecordException.class ); @@ -92,7 +92,7 @@ public void firstOfFieldNameShouldWorkAsExpected() StatementResult result = createResult( 3 ); // THEN - assertThat( result.next().get("k1"), equalTo( value("v1-1") ) ); + assertThat( result.next().get( "k1" ), equalTo( value( "v1-1" ) ) ); assertTrue( result.hasNext() ); } @@ -103,7 +103,7 @@ public void firstOfFieldIndexShouldWorkAsExpected() StatementResult result = createResult( 3 ); // THEN - assertThat( result.next().get(0), equalTo( value("v1-1") ) ); + assertThat( result.next().get( 0 ), equalTo( value( "v1-1" ) ) ); assertTrue( result.hasNext() ); } @@ -158,7 +158,7 @@ public void singleOfFieldNameShouldWorkAsExpected() StatementResult result = createResult( 1 ); // THEN - assertThat( result.single().get("k1"), equalTo( value("v1-1") ) ); + assertThat( result.single().get( "k1" ), equalTo( value( "v1-1" ) ) ); assertFalse( result.hasNext() ); } @@ -169,7 +169,7 @@ public void singleOfFieldIndexShouldWorkAsExpected() StatementResult result = createResult( 1 ); // THEN - assertThat( result.single().get(0), equalTo( value("v1-1") ) ); + assertThat( result.single().get( 0 ), equalTo( value( "v1-1" ) ) ); assertFalse( result.hasNext() ); } @@ -287,8 +287,8 @@ public void retainShouldWorkAsExpected() List records = result.list(); // THEN - assertFalse(result.hasNext()); - assertThat(records, hasSize( 3 ) ); + assertFalse( result.hasNext() ); + assertThat( records, hasSize( 3 ) ); } @Test @@ -301,8 +301,8 @@ public void retainAndMapByKeyShouldWorkAsExpected() List records = result.list( column( "k1" ) ); // THEN - assertFalse(result.hasNext()); - assertThat(records, hasSize( 3 ) ); + assertFalse( result.hasNext() ); + assertThat( records, hasSize( 3 ) ); } @Test @@ -315,8 +315,8 @@ public void retainAndMapByIndexShouldWorkAsExpected() List records = result.list( column( 0 ) ); // THEN - assertFalse(result.hasNext()); - assertThat(records, hasSize( 3 ) ); + assertFalse( result.hasNext() ); + assertThat( records, hasSize( 3 ) ); } @Test @@ -386,23 +386,103 @@ public void shouldNotPeekIntoTheFutureWhenResultIsEmpty() Record future = result.peek(); } + @Test + public void shouldNotifyResourcesHandlerWhenFetchedViaList() + { + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + StatementResult result = createResult( 10, resourcesHandler ); + + List records = result.list(); + assertEquals( 10, records.size() ); + + verify( resourcesHandler ).onResultConsumed(); + } + + @Test + public void shouldNotifyResourcesHandlerWhenFetchedViaSingle() + { + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + StatementResult result = createResult( 1, resourcesHandler ); + + Record record = result.single(); + assertEquals( "v1-1", record.get( "k1" ).asString() ); + + verify( resourcesHandler ).onResultConsumed(); + } + + @Test + public void shouldNotifyResourcesHandlerWhenFetchedViaIterator() + { + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + StatementResult result = createResult( 1, resourcesHandler ); + + while ( result.hasNext() ) + { + assertNotNull( result.next() ); + } + + verify( resourcesHandler ).onResultConsumed(); + } + + @Test + public void shouldNotifyResourcesHandlerWhenSummary() + { + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + StatementResult result = createResult( 10, resourcesHandler ); + + assertNotNull( result.summary() ); + + verify( resourcesHandler ).onResultConsumed(); + } + + @Test + public void shouldNotifyResourcesHandlerWhenConsumed() + { + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + StatementResult result = createResult( 5, resourcesHandler ); + + result.consume(); + + verify( resourcesHandler ).onResultConsumed(); + } + + @Test + public void shouldNotifyResourcesHandlerOnlyOnceWhenConsumed() + { + SessionResourcesHandler resourcesHandler = mock( SessionResourcesHandler.class ); + StatementResult result = createResult( 8, resourcesHandler ); + + assertEquals( 8, result.list().size() ); + assertNotNull( result.summary() ); + assertNotNull( result.consume() ); + assertNotNull( result.summary() ); + + verify( resourcesHandler ).onResultConsumed(); + } + private StatementResult createResult( int numberOfRecords ) + { + return createResult( numberOfRecords, SessionResourcesHandler.NO_OP ); + } + + private StatementResult createResult( int numberOfRecords, SessionResourcesHandler resourcesHandler ) { Connection connection = mock( Connection.class ); String statement = ""; - final InternalStatementResult cursor = new InternalStatementResult( connection, null, new Statement( statement ) ); + final InternalStatementResult result = new InternalStatementResult( connection, resourcesHandler, null, + new Statement( statement ) ); // Each time the cursor calls `recieveOne`, we'll run one of these, // to emulate how messages are handed over to the cursor final LinkedList inboundMessages = new LinkedList<>(); - inboundMessages.add( streamHeadMessage( cursor ) ); + inboundMessages.add( streamHeadMessage( result ) ); for ( int i = 1; i <= numberOfRecords; i++ ) { - inboundMessages.add( recordMessage( cursor, i ) ); + inboundMessages.add( recordMessage( result, i ) ); } - inboundMessages.add( streamTailMessage( cursor ) ); + inboundMessages.add( streamTailMessage( result ) ); doAnswer( new Answer() { @@ -412,9 +492,9 @@ public Object answer( InvocationOnMock invocationOnMock ) throws Throwable inboundMessages.poll().run(); return null; } - }).when( connection ).receiveOne(); + } ).when( connection ).receiveOne(); - return cursor; + return result; } private Runnable streamTailMessage( final InternalStatementResult cursor ) @@ -457,7 +537,7 @@ public void run() private List values( Record record ) { List result = new ArrayList<>( record.keys().size() ); - for ( Pair property : record.fields() ) + for ( Pair property : record.fields() ) { result.add( property.value() ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java deleted file mode 100644 index f8887b4d11..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionFactoryTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal; - -import org.junit.Test; - -import org.neo4j.driver.internal.spi.PooledConnection; -import org.neo4j.driver.v1.Logging; -import org.neo4j.driver.v1.Session; - -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; - -public class LeakLoggingNetworkSessionFactoryTest -{ - @Test - public void createsLeakLoggingNetworkSessions() - { - SessionFactory factory = new LeakLoggingNetworkSessionFactory( mock( Logging.class ) ); - - Session session = factory.newInstance( mock( PooledConnection.class ) ); - - assertThat( session, instanceOf( LeakLoggingNetworkSession.class ) ); - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java index 68629c5e5e..f519db6bb8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/LeakLoggingNetworkSessionTest.java @@ -22,11 +22,16 @@ import org.junit.Test; import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.lang.reflect.Method; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logger; +import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Session; import static org.hamcrest.Matchers.containsString; @@ -34,10 +39,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import static org.neo4j.driver.v1.AccessMode.READ; public class LeakLoggingNetworkSessionTest { @@ -47,24 +54,30 @@ public class LeakLoggingNetworkSessionTest @Test public void logsNothingDuringFinalizationIfClosed() throws Exception { - Logger logger = mock( Logger.class ); - Session session = new LeakLoggingNetworkSession( connectionMock( false ), logger ); + Logging logging = mock( Logging.class ); + Logger log = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( log ); + LeakLoggingNetworkSession session = newSession( logging, false ); finalize( session ); - verifyZeroInteractions( logger ); + verify( log, never() ).error( anyString(), any( Throwable.class ) ); } @Test public void logsMessageWithStacktraceDuringFinalizationIfLeaked() throws Exception { - Logger logger = mock( Logger.class ); - Session session = new LeakLoggingNetworkSession( connectionMock( true ), logger ); + Logging logging = mock( Logging.class ); + Logger log = mock( Logger.class ); + when( logging.getLog( anyString() ) ).thenReturn( log ); + LeakLoggingNetworkSession session = newSession( logging, true ); + // begin transaction to make session obtain a connection + session.beginTransaction(); finalize( session ); ArgumentCaptor messageCaptor = ArgumentCaptor.forClass( String.class ); - verify( logger ).error( messageCaptor.capture(), any( Throwable.class ) ); + verify( log ).error( messageCaptor.capture(), any( Throwable.class ) ); assertEquals( 1, messageCaptor.getAllValues().size() ); @@ -83,6 +96,25 @@ private static void finalize( Session session ) throws Exception finalizeMethod.invoke( session ); } + private static LeakLoggingNetworkSession newSession( Logging logging, boolean openConnection ) + { + return new LeakLoggingNetworkSession( connectionProviderMock( openConnection ), READ, logging ); + } + + private static ConnectionProvider connectionProviderMock( final boolean openConnection ) + { + ConnectionProvider provider = mock( ConnectionProvider.class ); + when( provider.acquireConnection( any( AccessMode.class ) ) ).thenAnswer( new Answer() + { + @Override + public PooledConnection answer( InvocationOnMock invocation ) throws Throwable + { + return connectionMock( openConnection ); + } + } ); + return provider; + } + private static PooledConnection connectionMock( boolean open ) { PooledConnection connection = mock( PooledConnection.class ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java index 40e868b295..61b2b7ef35 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/NetworkSessionTest.java @@ -18,68 +18,99 @@ */ package org.neo4j.driver.internal; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.InOrder; +import java.util.Map; + +import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; -import static junit.framework.Assert.fail; -import static junit.framework.TestCase.assertNotNull; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_MOCKS; +import static org.mockito.Mockito.anyMapOf; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.v1.AccessMode.READ; public class NetworkSessionTest { @Rule public ExpectedException exception = ExpectedException.none(); - private final PooledConnection mock = mock( PooledConnection.class ); - private final NetworkSession sess = new NetworkSession( mock ); + private PooledConnection connection; + private NetworkSession session; + + @Before + public void setUp() throws Exception + { + connection = mock( PooledConnection.class ); + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + when( connectionProvider.acquireConnection( any( AccessMode.class ) ) ).thenReturn( connection ); + session = newSession( connectionProvider, READ ); + } @Test public void shouldSendAllOnRun() throws Throwable { // Given - when( mock.isOpen() ).thenReturn( true ); - NetworkSession sess = new NetworkSession( mock ); + when( connection.isOpen() ).thenReturn( true ); // When - sess.run( "whatever" ); + session.run( "whatever" ); // Then - verify( mock ).flush(); + verify( connection ).flush(); } @Test public void shouldNotAllowNewTxWhileOneIsRunning() throws Throwable { // Given - when( mock.isOpen() ).thenReturn( true ); - sess.beginTransaction(); + when( connection.isOpen() ).thenReturn( true ); + session.beginTransaction(); // Expect exception.expect( ClientException.class ); // When - sess.beginTransaction(); + session.beginTransaction(); } @Test public void shouldBeAbleToOpenTxAfterPreviousIsClosed() throws Throwable { // Given - when( mock.isOpen() ).thenReturn( true ); - sess.beginTransaction().close(); + when( connection.isOpen() ).thenReturn( true ); + session.beginTransaction().close(); // When - Transaction tx = sess.beginTransaction(); + Transaction tx = session.beginTransaction(); // Then we should've gotten a transaction object back assertNotNull( tx ); @@ -89,68 +120,43 @@ public void shouldBeAbleToOpenTxAfterPreviousIsClosed() throws Throwable public void shouldNotBeAbleToUseSessionWhileOngoingTransaction() throws Throwable { // Given - when( mock.isOpen() ).thenReturn( true ); - sess.beginTransaction(); + when( connection.isOpen() ).thenReturn( true ); + session.beginTransaction(); // Expect exception.expect( ClientException.class ); // When - sess.run( "whatever" ); + session.run( "whatever" ); } @Test public void shouldBeAbleToUseSessionAgainWhenTransactionIsClosed() throws Throwable { // Given - when( mock.isOpen() ).thenReturn( true ); - sess.beginTransaction().close(); + when( connection.isOpen() ).thenReturn( true ); + session.beginTransaction().close(); // When - sess.run( "whatever" ); + session.run( "whatever" ); // Then - verify( mock ).flush(); - } - - @Test - public void shouldNotAllowMoreStatementsInSessionWhileConnectionClosed() throws Throwable - { - // Given - when( mock.isOpen() ).thenReturn( false ); - - // Expect - exception.expect( ServiceUnavailableException.class ); - - // When - sess.run( "whatever" ); - } - - @Test - public void shouldNotAllowMoreTransactionsInSessionWhileConnectionClosed() throws Throwable - { - // Given - when( mock.isOpen() ).thenReturn( false ); - - // Expect - exception.expect( ServiceUnavailableException.class ); - - // When - sess.beginTransaction(); + verify( connection ).flush(); } @Test public void shouldGetExceptionIfTryingToCloseSessionMoreThanOnce() throws Throwable { // Given - NetworkSession sess = new NetworkSession( mock( PooledConnection.class ) ); + ConnectionProvider connectionProvider = mock( ConnectionProvider.class, RETURNS_MOCKS ); + NetworkSession sess = newSession( connectionProvider, READ ); try { sess.close(); } - catch( Exception e ) + catch ( Exception e ) { - fail("Should not get any problem to close first time"); + fail( "Should not get any problem to close first time" ); } // When @@ -159,9 +165,364 @@ public void shouldGetExceptionIfTryingToCloseSessionMoreThanOnce() throws Throwa sess.close(); fail( "Should have received an error to close second time" ); } - catch( Exception e ) + catch ( Exception e ) + { + assertThat( e.getMessage(), equalTo( "This session has already been closed." ) ); + } + } + + @Test + public void runThrowsWhenSessionIsClosed() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class, RETURNS_MOCKS ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.close(); + + try + { + session.run( "CREATE ()" ); + fail( "Exception expected" ); + } + catch ( Exception e ) { - assertThat( e.getMessage(), equalTo("This session has already been closed." )); + assertThat( e, instanceOf( ClientException.class ) ); + assertThat( e.getMessage(), containsString( "session is already closed" ) ); } } + + @Test + public void acquiresNewConnectionForRun() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + + verify( connectionProvider ).acquireConnection( READ ); + verify( connection ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + } + + @Test + public void syncsAndClosesPreviousConnectionForRun() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection1 = mock( PooledConnection.class ); + when( connection1.isOpen() ).thenReturn( true ); + PooledConnection connection2 = mock( PooledConnection.class ); + when( connection2.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection1 ).thenReturn( connection2 ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + verify( connectionProvider ).acquireConnection( READ ); + verify( connection1 ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + session.run( "RETURN 2" ); + verify( connectionProvider, times( 2 ) ).acquireConnection( READ ); + verify( connection2 ).run( eq( "RETURN 2" ), anyParams(), any( Collector.class ) ); + + InOrder inOrder = inOrder( connection1 ); + inOrder.verify( connection1 ).sync(); + inOrder.verify( connection1 ).close(); + } + + @Test + public void closesPreviousBrokenConnectionForRun() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection1 ).thenReturn( connection2 ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + verify( connectionProvider ).acquireConnection( READ ); + verify( connection1 ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + session.run( "RETURN 2" ); + verify( connectionProvider, times( 2 ) ).acquireConnection( READ ); + verify( connection2 ).run( eq( "RETURN 2" ), anyParams(), any( Collector.class ) ); + + verify( connection1, never() ).sync(); + verify( connection1 ).close(); + } + + @Test + public void closesAndSyncOpenConnectionUsedForRunWhenSessionIsClosed() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + verify( connectionProvider ).acquireConnection( READ ); + verify( connection ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + session.close(); + + verify( connection ).sync(); + verify( connection ).close(); + } + + @Test + public void closesClosedConnectionUsedForRunWhenSessionIsClosed() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + verify( connectionProvider ).acquireConnection( READ ); + verify( connection ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + session.close(); + + verify( connection, never() ).sync(); + verify( connection ).close(); + } + + @Test + public void resetDoesNothingWhenNoTransactionAndNoConnection() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.reset(); + + verify( connectionProvider, never() ).acquireConnection( any( AccessMode.class ) ); + } + + @Test + public void closeWithoutConnection() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.close(); + + verify( connectionProvider, never() ).acquireConnection( any( AccessMode.class ) ); + } + + @Test + public void acquiresNewConnectionForBeginTx() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.beginTransaction(); + verify( connectionProvider ).acquireConnection( READ ); + } + + @Test + public void closesPreviousConnectionForBeginTx() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection1 ).thenReturn( connection2 ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + verify( connectionProvider ).acquireConnection( READ ); + verify( connection1 ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + session.beginTransaction(); + verify( connection1 ).close(); + verify( connectionProvider, times( 2 ) ).acquireConnection( READ ); + } + + @Test + public void updatesBookmarkWhenTxIsClosed() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + setBookmark( tx, "TheBookmark" ); + + assertNull( session.lastBookmark() ); + + tx.close(); + assertEquals( "TheBookmark", session.lastBookmark() ); + } + + @Test + public void closesConnectionWhenTxIsClosed() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + tx.run( "RETURN 1" ); + + verify( connectionProvider ).acquireConnection( READ ); + verify( connection ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + tx.close(); + verify( connection ).sync(); + verify( connection ).close(); + } + + @Test + public void ignoresWronglyClosedTx() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + when( connection1.isOpen() ).thenReturn( true ); + when( connection2.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection1 ).thenReturn( connection2 ); + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx1 = session.beginTransaction(); + tx1.close(); + + Transaction tx2 = session.beginTransaction(); + tx2.close(); + + tx1.close(); + + verify( connection1 ).sync(); + verify( connection1 ).close(); + + verify( connection2 ).sync(); + verify( connection2 ).close(); + } + + @Test + public void ignoresWronglyClosedTxWhenAnotherTxInProgress() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection1 = mock( PooledConnection.class ); + PooledConnection connection2 = mock( PooledConnection.class ); + when( connection1.isOpen() ).thenReturn( true ); + when( connection2.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection1 ).thenReturn( connection2 ); + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx1 = session.beginTransaction(); + tx1.close(); + + Transaction tx2 = session.beginTransaction(); + tx1.close(); + tx2.close(); + + verify( connection1 ).sync(); + verify( connection1 ).close(); + + verify( connection2 ).sync(); + verify( connection2 ).close(); + } + + @Test + public void transactionClosedDoesNothingWhenNoTx() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.onTransactionClosed( mock( ExplicitTransaction.class ) ); + + verifyZeroInteractions( connection ); + } + + @Test + public void transactionClosedIgnoresWrongTx() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.beginTransaction(); + verify( connectionProvider ).acquireConnection( READ ); + + ExplicitTransaction wrongTx = mock( ExplicitTransaction.class ); + session.onTransactionClosed( wrongTx ); + + verify( connection, never() ).close(); + } + + @Test + public void markTxAsFailedOnRecoverableConnectionError() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + assertTrue( tx.isOpen() ); + + session.onConnectionError( true ); + + assertFalse( tx.isOpen() ); + } + + @Test + public void markTxToCloseOnUnrecoverableConnectionError() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + Transaction tx = session.beginTransaction(); + assertTrue( tx.isOpen() ); + + session.onConnectionError( false ); + + assertFalse( tx.isOpen() ); + } + + @Test + public void closesConnectionWhenResultIsBuffered() + { + ConnectionProvider connectionProvider = mock( ConnectionProvider.class ); + PooledConnection connection = mock( PooledConnection.class ); + when( connection.isOpen() ).thenReturn( true ); + when( connectionProvider.acquireConnection( READ ) ).thenReturn( connection ); + NetworkSession session = newSession( connectionProvider, READ ); + + session.run( "RETURN 1" ); + verify( connectionProvider ).acquireConnection( READ ); + verify( connection ).run( eq( "RETURN 1" ), anyParams(), any( Collector.class ) ); + + session.onResultConsumed(); + + verify( connection, never() ).sync(); + verify( connection ).close(); + } + + private static NetworkSession newSession( ConnectionProvider connectionProvider, AccessMode mode ) + { + return new NetworkSession( connectionProvider, mode, DEV_NULL_LOGGING ); + } + + private static Map anyParams() + { + return anyMapOf( String.class, Value.class ); + } + + private static void setBookmark( Transaction tx, String bookmark ) + { + ((ExplicitTransaction) tx).setBookmark( bookmark ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java index b92823e486..d058d8ffe3 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java @@ -31,6 +31,7 @@ import org.neo4j.driver.internal.logging.ConsoleLogging; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; @@ -62,7 +63,7 @@ public void shouldHandleAcquireReadSession() throws IOException, InterruptedExce //START a read server StubServer readServer = StubServer.start( "read_server.script", 9005 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.READ ) ) { List result = session.run( "MATCH (n) RETURN n.name" ).list( new Function() @@ -92,7 +93,7 @@ public void shouldHandleAcquireReadSessionPlusTransaction() //START a read server StubServer readServer = StubServer.start( "read_server.script", 9005 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.READ ); Transaction tx = session.beginTransaction() ) { @@ -123,7 +124,7 @@ public void shouldRoundRobinReadServers() throws IOException, InterruptedExcepti StubServer readServer1 = StubServer.start( "read_server.script", 9005 ); StubServer readServer2 = StubServer.start( "read_server.script", 9006 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) ) + try ( Driver driver = GraphDatabase.driver( uri, config ) ) { // Run twice, one on each read server for ( int i = 0; i < 2; i++ ) @@ -158,7 +159,7 @@ public void shouldRoundRobinReadServersWhenUsingTransaction() StubServer readServer1 = StubServer.start( "read_server.script", 9005 ); StubServer readServer2 = StubServer.start( "read_server.script", 9006 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) ) + try ( Driver driver = GraphDatabase.driver( uri, config ) ) { // Run twice, one on each read server for ( int i = 0; i < 2; i++ ) @@ -199,7 +200,7 @@ public void shouldThrowSessionExpiredIfReadServerDisappears() exception.expect( SessionExpiredException.class ); exception.expectMessage( "Server at 127.0.0.1:9005 is no longer available" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.READ ) ) { session.run( "MATCH (n) RETURN n.name" ); @@ -225,7 +226,7 @@ public void shouldThrowSessionExpiredIfReadServerDisappearsWhenUsingTransaction( exception.expect( SessionExpiredException.class ); exception.expectMessage( "Server at 127.0.0.1:9005 is no longer available" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.READ ); Transaction tx = session.beginTransaction() ) { @@ -253,7 +254,7 @@ public void shouldThrowSessionExpiredIfWriteServerDisappears() exception.expect( SessionExpiredException.class ); //exception.expectMessage( "Server at 127.0.0.1:9006 is no longer available" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.WRITE ) ) { session.run( "MATCH (n) RETURN n.name" ).consume(); @@ -278,7 +279,7 @@ public void shouldThrowSessionExpiredIfWriteServerDisappearsWhenUsingTransaction //Expect exception.expect( SessionExpiredException.class ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.WRITE ); Transaction tx = session.beginTransaction() ) { @@ -300,7 +301,7 @@ public void shouldHandleAcquireWriteSession() throws IOException, InterruptedExc //START a write server StubServer writeServer = StubServer.start( "write_server.script", 9007 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.WRITE ) ) { session.run( "CREATE (n {name:'Bob'})" ); @@ -320,7 +321,7 @@ public void shouldHandleAcquireWriteSessionAndTransaction() //START a write server StubServer writeServer = StubServer.start( "write_server.script", 9007 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try ( Driver driver = GraphDatabase.driver( uri, config ); Session session = driver.session( AccessMode.WRITE ); Transaction tx = session.beginTransaction() ) { @@ -342,7 +343,7 @@ public void shouldRoundRobinWriteSessions() throws IOException, InterruptedExcep StubServer writeServer1 = StubServer.start( "write_server.script", 9007 ); StubServer writeServer2 = StubServer.start( "write_server.script", 9008 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) ) + try ( Driver driver = GraphDatabase.driver( uri, config ) ) { for ( int i = 0; i < 2; i++ ) { @@ -368,7 +369,7 @@ public void shouldRoundRobinWriteSessionsInTransaction() throws IOException, Int StubServer writeServer1 = StubServer.start( "write_server.script", 9007 ); StubServer writeServer2 = StubServer.start( "write_server.script", 9008 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - try ( RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ) ) + try ( Driver driver = GraphDatabase.driver( uri, config ) ) { for ( int i = 0; i < 2; i++ ) { @@ -424,7 +425,7 @@ public void shouldHandleLeaderSwitchWhenWriting() //START a write server that doesn't accept writes StubServer.start( "not_able_to_write_server.script", 9007 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + Driver driver = GraphDatabase.driver( uri, config ); boolean failed = false; try ( Session session = driver.session( AccessMode.WRITE ) ) { @@ -452,7 +453,7 @@ public void shouldHandleLeaderSwitchWhenWritingWithoutConsuming() //START a write server that doesn't accept writes StubServer.start( "not_able_to_write_server.script", 9007 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + Driver driver = GraphDatabase.driver( uri, config ); boolean failed = false; try ( Session session = driver.session( AccessMode.WRITE ) ) { @@ -480,7 +481,7 @@ public void shouldHandleLeaderSwitchWhenWritingInTransaction() //START a write server that doesn't accept writes StubServer.start( "not_able_to_write_server.script", 9007 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + Driver driver = GraphDatabase.driver( uri, config ); boolean failed = false; try ( Session session = driver.session( AccessMode.WRITE ); Transaction tx = session.beginTransaction() ) @@ -509,7 +510,7 @@ public void shouldHandleLeaderSwitchWhenWritingInTransactionWithoutConsuming() //START a write server that doesn't accept writes StubServer.start( "not_able_to_write_server.script", 9007 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); - RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + Driver driver = GraphDatabase.driver( uri, config ); boolean failed = false; try ( Session session = driver.session( AccessMode.WRITE ); Transaction tx = session.beginTransaction() ) diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java index 366e9fbdd2..eecf7567fe 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java @@ -29,14 +29,17 @@ import java.util.Collections; import java.util.Map; +import org.neo4j.driver.internal.cluster.LoadBalancer; import org.neo4j.driver.internal.cluster.RoutingSettings; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.ConnectionProvider; import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Driver; import org.neo4j.driver.v1.EventLogger; import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Logging; @@ -59,6 +62,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.cluster.ClusterCompositionProviderTest.serverInfo; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; import static org.neo4j.driver.v1.Values.value; @@ -93,7 +97,7 @@ public void shouldDiscoveryOnInitialization() public void shouldRediscoveryIfNoWritersProvided() { // Given - RoutingDriver routingDriver = driverWithPool( pool( + Driver driver = driverWithPool( pool( withServers( 10, serverInfo( "ROUTE", "localhost:1111" ), serverInfo( "WRITE" ), serverInfo( "READ", "localhost:5555" ) ), @@ -102,7 +106,7 @@ public void shouldRediscoveryIfNoWritersProvided() serverInfo( "WRITE", "localhost:3333" ) ) ) ); // When - NetworkSessionWithAddress writing = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress writing = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); // Then assertEquals( boltAddress( "localhost", 3333 ), writing.address ); @@ -112,7 +116,7 @@ public void shouldRediscoveryIfNoWritersProvided() public void shouldNotRediscoveryOnSessionAcquisitionIfNotNecessary() { // Given - RoutingDriver routingDriver = driverWithPool( pool( + Driver driver = driverWithPool( pool( withServers( 10, serverInfo( "ROUTE", "localhost:1111", "localhost:1112", "localhost:1113" ), serverInfo( "READ", "localhost:2222" ), serverInfo( "WRITE", "localhost:3333" ) ), @@ -121,8 +125,8 @@ public void shouldNotRediscoveryOnSessionAcquisitionIfNotNecessary() serverInfo( "WRITE", "localhost:5555" ) ) ) ); // When - NetworkSessionWithAddress writing = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress reading = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress writing = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress reading = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); // Then assertEquals( boltAddress( "localhost", 3333 ), writing.address ); @@ -204,11 +208,11 @@ public void shouldForgetServersOnRediscovery() serverInfo( "READ", "localhost:2222" ), serverInfo( "WRITE", "localhost:3333" ) ) ); - RoutingDriver routingDriver = driverWithPool( pool ); + Driver driver = driverWithPool( pool ); // When - NetworkSessionWithAddress write1 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress write2 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write1 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write2 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); // Then assertEquals( boltAddress( "localhost", 3333 ), write1.address ); @@ -219,7 +223,7 @@ public void shouldForgetServersOnRediscovery() public void shouldRediscoverOnTimeout() { // Given - RoutingDriver routingDriver = driverWithPool( pool( + Driver driver = driverWithPool( pool( withServers( 10, serverInfo( "ROUTE", "localhost:1111", "localhost:1112", "localhost:1113" ), serverInfo( "READ", "localhost:2222" ), serverInfo( "WRITE", "localhost:3333" ) ), @@ -230,8 +234,8 @@ public void shouldRediscoverOnTimeout() clock.progress( 11_000 ); // When - NetworkSessionWithAddress writing = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress reading = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress writing = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress reading = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); // Then assertEquals( boltAddress( "localhost", 8888 ), writing.address ); @@ -242,7 +246,7 @@ public void shouldRediscoverOnTimeout() public void shouldNotRediscoverWhenNoTimeout() { // Given - RoutingDriver routingDriver = driverWithPool( pool( + Driver driver = driverWithPool( pool( withServers( 10, serverInfo( "ROUTE", "localhost:1111", "localhost:1112", "localhost:1113" ), serverInfo( "READ", "localhost:2222" ), serverInfo( "WRITE", "localhost:3333" ) ), @@ -252,8 +256,8 @@ public void shouldNotRediscoverWhenNoTimeout() clock.progress( 9900 ); // When - NetworkSessionWithAddress writer = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress reader = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress writer = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress reader = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); // Then assertEquals( boltAddress( "localhost", 2222 ), reader.address ); @@ -264,17 +268,18 @@ public void shouldNotRediscoverWhenNoTimeout() public void shouldRoundRobinAmongReadServers() { // Given - RoutingDriver routingDriver = driverWithServers( 60, serverInfo( "ROUTE", "localhost:1111", "localhost:1112" ), + Driver driver = driverWithServers( 60, + serverInfo( "ROUTE", "localhost:1111", "localhost:1112" ), serverInfo( "READ", "localhost:2222", "localhost:2223", "localhost:2224" ), serverInfo( "WRITE", "localhost:3333" ) ); // When - NetworkSessionWithAddress read1 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); - NetworkSessionWithAddress read2 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); - NetworkSessionWithAddress read3 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); - NetworkSessionWithAddress read4 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); - NetworkSessionWithAddress read5 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); - NetworkSessionWithAddress read6 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.READ ); + NetworkSessionWithAddress read1 = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); + NetworkSessionWithAddress read2 = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); + NetworkSessionWithAddress read3 = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); + NetworkSessionWithAddress read4 = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); + NetworkSessionWithAddress read5 = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); + NetworkSessionWithAddress read6 = (NetworkSessionWithAddress) driver.session( AccessMode.READ ); // Then assertEquals( read1.address, read4.address ); @@ -289,17 +294,17 @@ public void shouldRoundRobinAmongReadServers() public void shouldRoundRobinAmongWriteServers() { // Given - RoutingDriver routingDriver = driverWithServers( 60, serverInfo( "ROUTE", "localhost:1111", "localhost:1112" ), + Driver driver = driverWithServers( 60, serverInfo( "ROUTE", "localhost:1111", "localhost:1112" ), serverInfo( "READ", "localhost:3333" ), serverInfo( "WRITE", "localhost:2222", "localhost:2223", "localhost:2224" ) ); // When - NetworkSessionWithAddress write1 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress write2 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress write3 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress write4 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress write5 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); - NetworkSessionWithAddress write6 = (NetworkSessionWithAddress) routingDriver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write1 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write2 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write3 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write4 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write5 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); + NetworkSessionWithAddress write6 = (NetworkSessionWithAddress) driver.session( AccessMode.WRITE ); // Then assertEquals( write1.address, write4.address ); @@ -331,16 +336,18 @@ public void testTrustOnFirstUseNotCompatibleWithRoutingDriver() } @SafeVarargs - private final RoutingDriver driverWithServers( long ttl, Map... serverInfo ) + private final Driver driverWithServers( long ttl, Map... serverInfo ) { return driverWithPool( poolWithServers( ttl, serverInfo ) ); } - private RoutingDriver driverWithPool( ConnectionPool pool ) + private Driver driverWithPool( ConnectionPool pool ) { RoutingSettings settings = new RoutingSettings( 10, 5_000 ); - SessionFactory sessionFactory = new NetworkSessionWithAddressFactory(); - return new RoutingDriver( settings, SEED, pool, insecure(), sessionFactory, clock, logging ); + ConnectionProvider connectionProvider = new LoadBalancer( settings, pool, clock, logging, SEED ); + SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, + Config.defaultConfig(), DEV_NULL_LOGGING ); + return new InternalDriver( insecure(), sessionFactory, logging ); } @SafeVarargs @@ -421,12 +428,17 @@ void collect( Collector collector ) }; } - private static class NetworkSessionWithAddressFactory implements SessionFactory + private static class NetworkSessionWithAddressFactory extends SessionFactoryImpl { + NetworkSessionWithAddressFactory( ConnectionProvider connectionProvider, Config config, Logging logging ) + { + super( connectionProvider, config, logging ); + } + @Override - public Session newInstance( PooledConnection connection ) + public Session newInstance( AccessMode mode ) { - return new NetworkSessionWithAddress( connection ); + return new NetworkSessionWithAddress( connectionProvider, mode, logging ); } } @@ -434,10 +446,13 @@ private static class NetworkSessionWithAddress extends NetworkSession { final BoltServerAddress address; - NetworkSessionWithAddress( PooledConnection connection ) + NetworkSessionWithAddress( ConnectionProvider connectionProvider, AccessMode mode, Logging logging ) { - super( connection ); - this.address = connection.boltServerAddress(); + super( connectionProvider, mode, logging ); + try ( PooledConnection connection = connectionProvider.acquireConnection( mode ) ) + { + this.address = connection.boltServerAddress(); + } } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java new file mode 100644 index 0000000000..824df4088b --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/SessionFactoryImplTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal; + +import org.junit.Test; + +import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.v1.AccessMode; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Session; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; + +public class SessionFactoryImplTest +{ + @Test + public void createsNetworkSessions() + { + Config config = Config.defaultConfig(); + SessionFactory factory = new SessionFactoryImpl( mock( ConnectionProvider.class ), config, DEV_NULL_LOGGING ); + + Session readSession = factory.newInstance( AccessMode.READ ); + assertThat( readSession, instanceOf( NetworkSession.class ) ); + + Session writeSession = factory.newInstance( AccessMode.WRITE ); + assertThat( writeSession, instanceOf( NetworkSession.class ) ); + } + + @Test + public void createsLeakLoggingNetworkSessions() + { + Config config = Config.build().withLeakedSessionsLogging().toConfig(); + SessionFactory factory = new SessionFactoryImpl( mock( ConnectionProvider.class ), config, DEV_NULL_LOGGING ); + + Session readSession = factory.newInstance( AccessMode.READ ); + assertThat( readSession, instanceOf( LeakLoggingNetworkSession.class ) ); + + Session writeSession = factory.newInstance( AccessMode.WRITE ); + assertThat( writeSession, instanceOf( LeakLoggingNetworkSession.class ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 342cb37e19..f808e456de 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -27,6 +27,7 @@ import org.neo4j.driver.internal.ExplicitTransaction; import org.neo4j.driver.internal.NetworkSession; +import org.neo4j.driver.internal.SessionResourcesHandler; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; @@ -36,7 +37,6 @@ import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; -import org.neo4j.driver.v1.util.Resource; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; @@ -44,6 +44,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -52,6 +53,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; +import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.v1.AccessMode.READ; +import static org.neo4j.driver.v1.AccessMode.WRITE; public class LoadBalancerTest { @@ -107,7 +111,7 @@ public void shouldEnsureRoutingWhenAcquireConn() throws Exception LoadBalancer spy = spy( balancer ); // when - Connection connection = spy.acquireReadConnection(); + Connection connection = spy.acquireConnection( READ ); connection.init( "Test", Collections.emptyMap() ); // then @@ -122,11 +126,11 @@ public void shouldAcquireReaderOrWriterConn() throws Exception PooledConnection readConn = mock( PooledConnection.class ); LoadBalancer balancer = setupLoadBalancer( writerConn, readConn ); - Connection acquiredReadConn = balancer.acquireReadConnection(); + Connection acquiredReadConn = balancer.acquireConnection( READ ); acquiredReadConn.init( "TestRead", Collections.emptyMap() ); verify( readConn ).init( "TestRead", Collections.emptyMap() ); - Connection acquiredWriteConn = balancer.acquireWriteConnection(); + Connection acquiredWriteConn = balancer.acquireConnection( WRITE ); acquiredWriteConn.init( "TestWrite", Collections.emptyMap() ); verify( writerConn ).init( "TestWrite", Collections.emptyMap() ); } @@ -142,9 +146,18 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing PooledConnection connection = newConnectionWithFailingSync( address ); Connection routingConnection = new RoutingPooledConnection( connection, loadBalancer, AccessMode.WRITE ); - Transaction tx = new ExplicitTransaction( routingConnection, mock( Runnable.class ) ); + Transaction tx = new ExplicitTransaction( routingConnection, mock( SessionResourcesHandler.class ) ); - assertThrowsSessionExpiredException( tx ); + try + { + tx.close(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) ); + } verify( routingTable ).forget( address ); verify( connectionPool ).purge( address ); @@ -153,17 +166,19 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing @Test public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingSession() { - RoutingTable routingTable = mock( RoutingTable.class ); + RoutingTable routingTable = mock( RoutingTable.class, RETURNS_MOCKS ); ConnectionPool connectionPool = mock( ConnectionPool.class ); + BoltServerAddress address = new BoltServerAddress( "host", 42 ); + PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address ); + when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync ); Rediscovery rediscovery = mock( Rediscovery.class ); LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER ); - BoltServerAddress address = new BoltServerAddress( "host", 42 ); - PooledConnection connection = newConnectionWithFailingSync( address ); - PooledConnection routingConnection = new RoutingPooledConnection( connection, loadBalancer, AccessMode.WRITE ); - NetworkSession session = new NetworkSession( routingConnection ); + NetworkSession session = new NetworkSession( loadBalancer, AccessMode.WRITE, DEV_NULL_LOGGING ); + // begin transaction to make session obtain a connection + session.beginTransaction(); - assertThrowsSessionExpiredException( session ); + session.close(); verify( routingTable ).forget( address ); verify( connectionPool ).purge( address ); @@ -201,17 +216,4 @@ private static PooledConnection newConnectionWithFailingSync( BoltServerAddress return connection; } - private static void assertThrowsSessionExpiredException( Resource resource ) - { - try - { - resource.close(); - fail( "Exception expected" ); - } - catch ( Exception e ) - { - assertThat( e, instanceOf( SessionExpiredException.class ) ); - assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) ); - } - } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java index 71c602a5b7..4f0f8c2221 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java @@ -62,6 +62,8 @@ import static org.neo4j.driver.internal.util.Matchers.containsReader; import static org.neo4j.driver.internal.util.Matchers.containsRouter; import static org.neo4j.driver.internal.util.Matchers.containsWriter; +import static org.neo4j.driver.v1.AccessMode.READ; +import static org.neo4j.driver.v1.AccessMode.WRITE; import static org.neo4j.driver.v1.Values.value; @RunWith( Parameterized.class ) @@ -99,10 +101,10 @@ public void shouldHandleServiceUnavailableException() ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); - Connection readConnection = loadBalancer.acquireReadConnection(); + Connection readConnection = loadBalancer.acquireConnection( READ ); verifyServiceUnavailableHandling( readConnection, routingTable, connectionPool ); - Connection writeConnection = loadBalancer.acquireWriteConnection(); + Connection writeConnection = loadBalancer.acquireConnection( WRITE ); verifyServiceUnavailableHandling( writeConnection, routingTable, connectionPool ); assertThat( routingTable, containsRouter( ADDRESS3 ) ); @@ -143,7 +145,7 @@ private void testHandleFailureToWriteWithWriteConnection( ClientException error ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); - Connection readConnection = loadBalancer.acquireReadConnection(); + Connection readConnection = loadBalancer.acquireConnection( READ ); try { method.invoke( readConnection ); @@ -171,7 +173,7 @@ private void testHandleFailureToWrite( ClientException error ) ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); - Connection readConnection = loadBalancer.acquireWriteConnection(); + Connection readConnection = loadBalancer.acquireConnection( WRITE ); try { method.invoke( readConnection ); @@ -199,10 +201,10 @@ private void testThrowablePropagation( Throwable error ) ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); - Connection readConnection = loadBalancer.acquireReadConnection(); + Connection readConnection = loadBalancer.acquireConnection( READ ); verifyThrowablePropagation( readConnection, routingTable, connectionPool, error.getClass() ); - Connection writeConnection = loadBalancer.acquireWriteConnection(); + Connection writeConnection = loadBalancer.acquireConnection( WRITE ); verifyThrowablePropagation( writeConnection, routingTable, connectionPool, error.getClass() ); assertThat( routingTable, containsRouter( ADDRESS3 ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java index ac0b1ffbb7..4433673370 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -22,9 +22,18 @@ import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import java.util.Objects; + +import org.neo4j.driver.internal.DirectConnectionProvider; +import org.neo4j.driver.internal.InternalDriver; +import org.neo4j.driver.internal.SessionFactory; +import org.neo4j.driver.internal.SessionFactoryImpl; +import org.neo4j.driver.internal.cluster.LoadBalancer; import org.neo4j.driver.internal.cluster.RoundRobinAddressSet; import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.ConnectionProvider; +import org.neo4j.driver.v1.Driver; public final class Matchers { @@ -93,6 +102,61 @@ public void describeTo( Description description ) }; } + public static Matcher directDriver() + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( Driver driver ) + { + return hasConnectionProvider( driver, DirectConnectionProvider.class ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "direct 'bolt://' driver " ); + } + }; + } + + public static Matcher directDriverWithAddress( final BoltServerAddress address ) + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( Driver driver ) + { + DirectConnectionProvider provider = extractConnectionProvider( driver, DirectConnectionProvider.class ); + return provider != null && Objects.equals( provider.getAddress(), address ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "direct driver with address bolt://" ).appendValue( address ); + } + }; + } + + public static Matcher clusterDriver() + { + return new TypeSafeMatcher() + { + @Override + protected boolean matchesSafely( Driver driver ) + { + return hasConnectionProvider( driver, LoadBalancer.class ); + } + + @Override + public void describeTo( Description description ) + { + description.appendText( "cluster 'bolt+routing://' driver " ); + } + }; + } + private static boolean contains( RoundRobinAddressSet set, BoltServerAddress address ) { for ( int i = 0; i < set.size(); i++ ) @@ -104,4 +168,26 @@ private static boolean contains( RoundRobinAddressSet set, BoltServerAddress add } return false; } + + private static boolean hasConnectionProvider( Driver driver, Class providerClass ) + { + return extractConnectionProvider( driver, providerClass ) != null; + } + + private static T extractConnectionProvider( Driver driver, Class providerClass ) + { + if ( driver instanceof InternalDriver ) + { + SessionFactory sessionFactory = ((InternalDriver) driver).getSessionFactory(); + if ( sessionFactory instanceof SessionFactoryImpl ) + { + ConnectionProvider provider = ((SessionFactoryImpl) sessionFactory).getConnectionProvider(); + if ( providerClass.isInstance( provider ) ) + { + return providerClass.cast( provider ); + } + } + } + return null; + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/ConnectionHandlingIT.java b/driver/src/test/java/org/neo4j/driver/v1/ConnectionHandlingIT.java new file mode 100644 index 0000000000..d728dc3bee --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/v1/ConnectionHandlingIT.java @@ -0,0 +1,380 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.v1; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +import org.neo4j.driver.internal.ConnectionSettings; +import org.neo4j.driver.internal.DriverFactory; +import org.neo4j.driver.internal.cluster.RoutingSettings; +import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.net.pooling.PoolSettings; +import org.neo4j.driver.internal.net.pooling.SocketConnectionPool; +import org.neo4j.driver.internal.security.SecurityPlan; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.spi.Connector; +import org.neo4j.driver.internal.spi.PooledConnection; +import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.util.TestNeo4j; + +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.v1.Config.defaultConfig; +import static org.neo4j.driver.v1.Values.parameters; + +public class ConnectionHandlingIT +{ + @ClassRule + public static final TestNeo4j neo4j = new TestNeo4j(); + + private Driver driver; + private MemorizingConnectionPool connectionPool; + + @Before + public void createDriver() + { + DriverFactoryWithConnector driverFactory = new DriverFactoryWithConnector(); + RoutingSettings routingSettings = new RoutingSettings( 1, 1 ); + driver = driverFactory.newInstance( neo4j.uri(), AuthTokens.none(), routingSettings, defaultConfig() ); + connectionPool = driverFactory.connectionPool; + } + + @After + public void closeDriver() + { + driver.close(); + } + + @Test + public void connectionUsedForSessionRunReturnedToThePoolWhenResultConsumed() + { + StatementResult result = createNodesInNewSession( 12 ); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + result.consume(); + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + } + + @Test + public void connectionUsedForSessionRunReturnedToThePoolWhenResultSummaryObtained() + { + StatementResult result = createNodesInNewSession( 5 ); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + ResultSummary summary = result.summary(); + + assertEquals( 5, summary.counters().nodesCreated() ); + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + } + + @Test + public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedInList() + { + StatementResult result = createNodesInNewSession( 2 ); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + List records = result.list(); + assertEquals( 2, records.size() ); + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + } + + @Test + public void connectionUsedForSessionRunReturnedToThePoolWhenSingleRecordFetched() + { + StatementResult result = createNodesInNewSession( 1 ); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + assertNotNull( result.single() ); + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + } + + @Test + public void connectionUsedForSessionRunReturnedToThePoolWhenResultFetchedAsIterator() + { + StatementResult result = createNodesInNewSession( 6 ); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + int seenRecords = 0; + while ( result.hasNext() ) + { + assertNotNull( result.next() ); + seenRecords++; + } + assertEquals( 6, seenRecords ); + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + } + + @Test + public void connectionUsedForSessionRunReturnedToThePoolWhenServerErrorDuringResultFetching() + { + Session session = driver.session(); + // provoke division by zero + StatementResult result = session.run( "UNWIND range(10, 0, -1) AS i CREATE (n {index: 10/i}) RETURN n" ); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + try + { + result.hasNext(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + } + + @Test + public void previousSessionRunResultIsBufferedBeforeRunningNewStatement() + { + Session session = driver.session(); + + StatementResult result1 = createNodes( 3, session ); + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + StatementResult result2 = createNodes( 2, session ); + verify( connection1 ).close(); + + assertEquals( 3, result1.list().size() ); + assertEquals( 2, result2.list().size() ); + } + + @Test + public void previousSessionRunResultIsBufferedBeforeStartingNewTransaction() + { + Session session = driver.session(); + + StatementResult result1 = createNodes( 3, session ); + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + session.beginTransaction(); + verify( connection1 ).close(); + + assertEquals( 3, result1.list().size() ); + } + + @Test + public void connectionUsedForTransactionReturnedToThePoolWhenTransactionCommitted() + { + Session session = driver.session(); + + Transaction tx = session.beginTransaction(); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + StatementResult result = createNodes( 5, tx ); + tx.success(); + tx.close(); + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + + assertEquals( 5, result.list().size() ); + } + + @Test + public void connectionUsedForTransactionReturnedToThePoolWhenTransactionRolledBack() + { + Session session = driver.session(); + + Transaction tx = session.beginTransaction(); + + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1, never() ).close(); + + StatementResult result = createNodes( 8, tx ); + tx.failure(); + tx.close(); + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + verify( connection1 ).close(); + + assertEquals( 8, result.list().size() ); + } + + @Test + public void connectionUsedForTransactionReturnedToThePoolWhenTransactionFailsToCommitted() throws Exception + { + try ( Session session = driver.session() ) + { + session.run( "CREATE CONSTRAINT ON (book:Book) ASSERT exists(book.isbn)" ); + } + + Session session = driver.session(); + + Transaction tx = session.beginTransaction(); + Connection connection1 = connectionPool.lastAcquiredConnectionSpy; + verify( connection1 ).close(); // connection previously used for constraint creation + + // property existence constraints are verified on commit, try to violate it + tx.run( "CREATE (:Book)" ); + tx.success(); + + try + { + tx.close(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + + Connection connection2 = connectionPool.lastAcquiredConnectionSpy; + assertSame( connection1, connection2 ); + // connection should have been closed twice: for constraint creation and for node creation + verify( connection1, times( 2 ) ).close(); + } + + @Test + public void connectionDisposedWhenItHasUnrecoverableError() + { + Session session = driver.session(); + + PooledConnection connection1; + StatementResult result1; + try ( Transaction tx = session.beginTransaction() ) + { + result1 = tx.run( "RETURN 42 AS answer" ); + tx.success(); + connection1 = connectionPool.lastAcquiredConnectionSpy; + when( connection1.hasUnrecoverableErrors() ).thenReturn( true ); + } + + verify( connection1 ).dispose(); + assertEquals( 42, result1.single().get( "answer" ).asInt() ); + + PooledConnection connection2; + StatementResult result2; + try ( Transaction tx = session.beginTransaction() ) + { + result2 = tx.run( "RETURN 4242 AS answer" ); + tx.success(); + connection2 = connectionPool.lastAcquiredConnectionSpy; + assertNotSame( connection1, connection2 ); + } + + verify( connection2, never() ).dispose(); + verify( connection2 ).close(); + assertEquals( 4242, result2.single().get( "answer" ).asInt() ); + } + + private StatementResult createNodesInNewSession( int nodesToCreate ) + { + return createNodes( nodesToCreate, driver.session() ); + } + + private StatementResult createNodes( int nodesToCreate, StatementRunner statementRunner ) + { + return statementRunner.run( "UNWIND range(1, {nodesToCreate}) AS i CREATE (n {index: i}) RETURN n", + parameters( "nodesToCreate", nodesToCreate ) ); + } + + private static class DriverFactoryWithConnector extends DriverFactory + { + MemorizingConnectionPool connectionPool; + + @Override + protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config ) + { + ConnectionSettings connectionSettings = new ConnectionSettings( authToken, 1000 ); + PoolSettings poolSettings = new PoolSettings( 10, 0 ); + Connector connector = createConnector( connectionSettings, securityPlan, config.logging() ); + connectionPool = new MemorizingConnectionPool( poolSettings, connector, createClock(), config.logging() ); + return connectionPool; + } + } + + private static class MemorizingConnectionPool extends SocketConnectionPool + { + PooledConnection lastAcquiredConnectionSpy; + + MemorizingConnectionPool( PoolSettings poolSettings, Connector connector, Clock clock, Logging logging ) + { + super( poolSettings, connector, clock, logging ); + } + + @Override + public PooledConnection acquire( BoltServerAddress address ) + { + PooledConnection connection = super.acquire( address ); + // this connection pool returns spies so spies will be returned to the pool + // prevent spying on spies... + if ( !Mockito.mockingDetails( connection ).isSpy() ) + { + connection = spy( connection ); + } + lastAcquiredConnectionSpy = connection; + return connection; + } + } +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java b/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java index cdf4e5239c..72f5e7917c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/DriverCloseIT.java @@ -29,7 +29,6 @@ import java.util.logging.Level; import org.neo4j.driver.internal.logging.ConsoleLogging; -import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.StubServer; import org.neo4j.driver.v1.util.TestNeo4j; @@ -136,7 +135,7 @@ public void useSessionAfterDriverIsClosed() } catch ( Exception e ) { - assertThat( e, instanceOf( ServiceUnavailableException.class ) ); + assertThat( e, instanceOf( IllegalStateException.class ) ); } } } @@ -195,7 +194,7 @@ public void useSessionAfterDriverIsClosed() throws Exception } catch ( Exception e ) { - assertThat( e, instanceOf( ServiceUnavailableException.class ) ); + assertThat( e, instanceOf( IllegalStateException.class ) ); } assertEquals( 0, readServer.exitStatus() ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java index 22d2645e37..3a1d43562c 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java +++ b/driver/src/test/java/org/neo4j/driver/v1/GraphDatabaseTest.java @@ -20,15 +20,20 @@ import org.junit.Test; +import java.io.File; import java.net.URI; -import org.neo4j.driver.internal.DirectDriver; -import org.neo4j.driver.internal.RoutingDriver; import org.neo4j.driver.v1.util.StubServer; +import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.neo4j.driver.internal.util.Matchers.clusterDriver; +import static org.neo4j.driver.internal.util.Matchers.directDriver; +import static org.neo4j.driver.v1.Config.EncryptionLevel.REQUIRED; +import static org.neo4j.driver.v1.Config.TrustStrategy.trustOnFirstUse; import static org.neo4j.driver.v1.util.StubServer.INSECURE_CONFIG; public class GraphDatabaseTest @@ -43,8 +48,7 @@ public void boltSchemeShouldInstantiateDirectDriver() Driver driver = GraphDatabase.driver( uri ); // Then - assertThat( driver, instanceOf( DirectDriver.class ) ); - + assertThat( driver, is( directDriver() ) ); } @Test @@ -58,10 +62,31 @@ public void boltPlusDiscoverySchemeShouldInstantiateClusterDriver() throws Excep Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG ); // Then - assertThat( driver, instanceOf( RoutingDriver.class ) ); + assertThat( driver, is( clusterDriver() ) ); // Finally driver.close(); assertThat( server.exitStatus(), equalTo( 0 ) ); } + + @Test + public void boltPlusDiscoverySchemeShouldNotSupportTrustOnFirstUse() + { + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + + Config config = Config.build() + .withEncryptionLevel( REQUIRED ) + .withTrustStrategy( trustOnFirstUse( new File( "./known_hosts" ) ) ) + .toConfig(); + + try + { + GraphDatabase.driver( uri, config ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( IllegalArgumentException.class ) ); + } + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java index f9a69e1064..f3bb5590b7 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CredentialsIT.java @@ -34,11 +34,11 @@ import org.neo4j.driver.v1.util.Neo4jSettings; import org.neo4j.driver.v1.util.TestNeo4j; -import static junit.framework.TestCase.fail; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.fail; import static org.neo4j.driver.v1.AuthTokens.basic; import static org.neo4j.driver.v1.AuthTokens.custom; import static org.neo4j.driver.v1.Values.ofValue; @@ -67,17 +67,17 @@ public void basicCredentialsShouldWork() throws Throwable } } - @SuppressWarnings( "EmptyTryBlock" ) @Test public void shouldGetHelpfulErrorOnInvalidCredentials() throws Throwable { // When - try( Driver driver = GraphDatabase.driver( neo4j.uri(), basic("thisisnotthepassword", password ) ); - Session ignored = driver.session() ) { - //empty + try ( Driver driver = GraphDatabase.driver( neo4j.uri(), basic( "thisisnotthepassword", password ) ); + Session session = driver.session() ) + { + session.run( "RETURN 1" ); fail( "Should fail with an auth error already" ); } - catch( Throwable e ) + catch ( Throwable e ) { assertThat( e, instanceOf( SecurityException.class ) ); assertThat( e.getMessage(), containsString( "The client is unauthorized due to authentication failure." ) ); diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java index 5e637fa593..fd6f434a80 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ErrorIT.java @@ -114,15 +114,18 @@ public void shouldAllowNewTransactionAfterRecoverableError() throws Throwable @Test public void shouldExplainConnectionError() throws Throwable { - // Expect - exception.expect( ServiceUnavailableException.class ); - exception.expectMessage( "Unable to connect to localhost:7777, ensure the database is running " + - "and that there is a working network connection to it." ); - - // When - //noinspection EmptyTryBlock + // Given try ( Driver driver = GraphDatabase.driver( "bolt://localhost:7777" ); - Session ignore = driver.session()) {/*empty*/} + Session session = driver.session() ) + { + // Expect + exception.expect( ServiceUnavailableException.class ); + exception.expectMessage( "Unable to connect to localhost:7777, ensure the database is running " + + "and that there is a working network connection to it." ); + + // When + session.run( "RETURN 1" ); + } } @Test @@ -157,8 +160,9 @@ public void shouldGetHelpfulErrorWhenTryingToConnectToHttpPort() throws Throwabl // Given //the http server needs some time to start up Thread.sleep( 2000 ); - try( Driver driver = GraphDatabase.driver( "bolt://localhost:7474", - Config.build().withEncryptionLevel( Config.EncryptionLevel.NONE ).toConfig() ) ) + Config config = Config.build().withEncryptionLevel( Config.EncryptionLevel.NONE ).toConfig(); + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:7474", config ); + Session session = driver.session() ) { // Expect exception.expect( ClientException.class ); @@ -167,8 +171,7 @@ public void shouldGetHelpfulErrorWhenTryingToConnectToHttpPort() throws Throwabl "(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)" ); // When - //noinspection EmptyTryBlock - try(Session ignore = driver.session() ){/*empty*/} + session.run( "RETURN 1" ); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ResultStreamIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ResultStreamIT.java index b7cd2dd5ef..c832cc7036 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ResultStreamIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ResultStreamIT.java @@ -23,15 +23,19 @@ import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.summary.ResultSummary; import org.neo4j.driver.v1.util.TestNeo4jSession; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.neo4j.driver.v1.Values.parameters; public class ResultStreamIT @@ -111,6 +115,7 @@ public void shouldBeAbleToReuseSessionAfterFailure() throws Throwable try { res1.consume(); + fail( "Exception expected" ); } catch ( Exception e ) { @@ -187,4 +192,40 @@ public void shouldDiscardRecordsAfterConsume() throws Throwable assertThat( result.hasNext(), equalTo( false ) ); } + + @Test + public void shouldHasNoElementsAfterFailure() + { + StatementResult result = session.run( "INVALID" ); + + try + { + result.hasNext(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + + assertFalse( result.hasNext() ); + } + + @Test + public void shouldBeAnEmptyLitAfterFailure() + { + StatementResult result = session.run( "UNWIND (0, 1) as i RETURN 10 / i" ); + + try + { + result.list(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( ClientException.class ) ); + } + + assertTrue( result.list().isEmpty() ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java index 6aaa6ea881..ac02bc64c3 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/ServerKilledIT.java @@ -78,7 +78,7 @@ public void shouldRecoverFromServerRestart() throws Throwable try ( Driver driver = GraphDatabase.driver( Neo4jRunner.DEFAULT_URI, config ) ) { - acquireAndReleaseSessions( 4, driver ); + acquireAndReleaseConnections( 4, driver ); // When neo4j.forceRestart(); @@ -122,7 +122,7 @@ public void shouldDropBrokenOldSessions() throws Throwable try ( Driver driver = driverFactory.newInstance( Neo4jRunner.DEFAULT_URI, null, null, config ) ) { - acquireAndReleaseSessions( 5, driver ); + acquireAndReleaseConnections( 5, driver ); // restart database to invalidate all idle connections in the pool neo4j.forceRestart(); @@ -140,12 +140,13 @@ public void shouldDropBrokenOldSessions() throws Throwable } } - private static void acquireAndReleaseSessions( int count, Driver driver ) + private static void acquireAndReleaseConnections( int count, Driver driver ) { if ( count > 0 ) { Session session = driver.session(); - acquireAndReleaseSessions( count - 1, driver ); + session.run( "RETURN 1" ); + acquireAndReleaseConnections( count - 1, driver ); session.close(); } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java index 53e0f74c68..f688c5b52e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/SessionIT.java @@ -381,18 +381,4 @@ public void shouldAllowMoreTxAfterSessionResetInTx() } } } - - @Test - public void shouldCloseSessionWhenDriverIsClosed() throws Throwable - { - // Given - Driver driver = GraphDatabase.driver( neo4j.uri() ); - Session session = driver.session(); - - // When - driver.close(); - - // Then - assertFalse( session.isOpen() ); - } } diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverSecurityComplianceSteps.java b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverSecurityComplianceSteps.java index 94c3a81a02..24f4296e48 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/DriverSecurityComplianceSteps.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/DriverSecurityComplianceSteps.java @@ -123,7 +123,10 @@ public void theDatabaseHasChangedWhichCertificateItUses() throws Throwable @Then( "^creating sessions should fail$" ) public void creatingSessionsShouldFail() throws Throwable { - try ( Session session = driver.session() ) {} + try ( Session session = driver.session() ) + { + session.run( "RETURN 1" ); + } catch ( Exception e ) { exception = e; diff --git a/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java b/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java index 3b96b01615..9a4b95e240 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java +++ b/driver/src/test/java/org/neo4j/driver/v1/tck/ErrorReportingSteps.java @@ -162,9 +162,10 @@ public void iRunANonValidCypherStatementItThrowsAnClientException() throws Throw @When( "^I set up a driver to an incorrect port" ) public void iSetUpADriverToAnIncorrectPort() throws Throwable { - try ( Driver driver = GraphDatabase.driver( "bolt://localhost:7777" ) ) + try ( Driver driver = GraphDatabase.driver( "bolt://localhost:7777" ); + Session session = driver.session() ) { - driver.session(); + session.run( "RETURN 1" ); } catch ( Exception e ) { @@ -260,4 +261,4 @@ public void iStoreASession() throws Throwable { storedSessions.add( smallDriver.session() ); } -} \ No newline at end of file +} diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/Neo4jRunner.java b/driver/src/test/java/org/neo4j/driver/v1/util/Neo4jRunner.java index 98e96ef1d8..16eec75581 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/Neo4jRunner.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/Neo4jRunner.java @@ -92,6 +92,10 @@ public void ensureRunning(Neo4jSettings neo4jSettings) throws IOException, Inter public Driver driver() { + if ( driver == null ) + { + driver = GraphDatabase.driver( DEFAULT_URI ); + } return driver; } @@ -105,7 +109,6 @@ private void startNeo4j() throws IOException { throw new IOException( "Failed to start neo4j server." ); } - driver = GraphDatabase.driver( DEFAULT_URI /* default encryption REQUIRED_NON_LOCAL */ ); } public synchronized void stopNeo4j() throws IOException