Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Decouple session from connection #324

Merged
merged 11 commits into from
Feb 24, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,39 @@
package org.neo4j.driver.internal;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;

import static java.lang.String.format;

public class DirectDriver extends BaseDriver
/**
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
* the given address.
*/
public class DirectConnectionProvider implements ConnectionProvider
{
private final BoltServerAddress address;
protected final ConnectionPool connections;
private final ConnectionPool pool;

public DirectDriver(
BoltServerAddress address,
ConnectionPool connections,
SecurityPlan securityPlan,
SessionFactory sessionFactory,
Logging logging )
DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool )
{
super( securityPlan, sessionFactory, logging );
this.address = address;
this.connections = connections;
this.pool = pool;
}

@Override
protected Session newSessionWithMode( AccessMode mode )
public PooledConnection acquireConnection( AccessMode mode )
{
return sessionFactory.newInstance( connections.acquire( address ) );
return pool.acquire( address );
}

@Override
protected void closeResources()
public void close() throws Exception
{
try
{
connections.close();
}
catch ( Exception ex )
{
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
}
pool.close();
}

BoltServerAddress server()
public BoltServerAddress getAddress()
{
return address;
}
Expand Down
74 changes: 50 additions & 24 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.cluster.LoadBalancer;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.SocketConnector;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.Connector;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.AuthToken;
Expand All @@ -50,13 +52,10 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
BoltServerAddress address = BoltServerAddress.from( uri );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
SessionFactory sessionFactory = createSessionFactory( config );

try
{
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
sessionFactory
);
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan );
}
catch ( Throwable driverError )
{
Expand All @@ -74,42 +73,68 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}

private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
SessionFactory sessionFactory )
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
{
switch ( scheme.toLowerCase() )
{
case "bolt":
return createDirectDriver( address, connectionPool, config, securityPlan, sessionFactory );
return createDirectDriver( address, connectionPool, config, securityPlan );
case "bolt+routing":
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan,
sessionFactory );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
}

/**
* Creates new {@link DirectDriver}.
* Creates a new driver for "bolt" scheme.
* <p>
* <b>This method is protected only for testing</b>
*/
protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan )
{
return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() );
ConnectionProvider connectionProvider = new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
return createDriver( config, securityPlan, sessionFactory );
}

/**
* Creates new {@link RoutingDriver}.
* Creates new a new driver for "bolt+routing" scheme.
* <p>
* <b>This method is protected only for testing</b>
*/
protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan )
{
return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory,
createClock(), config.logging() );
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, config );
return createDriver( config, securityPlan, sessionFactory );
}

/**
* Creates new {@link Driver}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
{
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
}

/**
* Creates new {@link LoadBalancer} for the routing driver.
* <p>
* <b>This method is protected only for testing</b>
*/
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings )
{
return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address );
}

/**
Expand Down Expand Up @@ -150,13 +175,14 @@ protected Connector createConnector( ConnectionSettings connectionSettings, Secu
return new SocketConnector( connectionSettings, securityPlan, logging );
}

private static SessionFactory createSessionFactory( Config config )
/**
* Creates new {@link SessionFactory}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected SessionFactory createSessionFactory( ConnectionProvider connectionProvider, Config config )
{
if ( config.logLeakedSessions() )
{
return new LeakLoggingNetworkSessionFactory( config.logging() );
}
return new NetworkSessionFactory();
return new SessionFactoryImpl( connectionProvider, config, config.logging() );
}

private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,21 @@ private enum State
ROLLED_BACK
}

private final Runnable cleanup;
private final SessionResourcesHandler resourcesHandler;
private final Connection conn;

private String bookmark = null;
private State state = State.ACTIVE;

public ExplicitTransaction( Connection conn, Runnable cleanup )
public ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler )
{
this( conn, cleanup, null );
this( conn, resourcesHandler, null );
}

ExplicitTransaction( Connection conn, Runnable cleanup, String bookmark )
ExplicitTransaction( Connection conn, SessionResourcesHandler resourcesHandler, String bookmark )
{
this.conn = conn;
this.cleanup = cleanup;
this.resourcesHandler = resourcesHandler;
runBeginStatement( conn, bookmark );
}

Expand Down Expand Up @@ -139,7 +139,7 @@ else if ( state == State.MARKED_FAILED || state == State.ACTIVE )
}
finally
{
cleanup.run();
resourcesHandler.onTransactionClosed( this );
}
}

Expand Down Expand Up @@ -185,13 +185,14 @@ public synchronized StatementResult run( Statement statement )

try
{
InternalStatementResult cursor = new InternalStatementResult( conn, this, statement );
InternalStatementResult result =
new InternalStatementResult( conn, SessionResourcesHandler.NO_OP, this, statement );
conn.run( statement.text(),
statement.parameters().asMap( ofValue() ),
cursor.runResponseCollector() );
conn.pullAll( cursor.pullAllResponseCollector() );
result.runResponseCollector() );
conn.pullAll( result.pullAllResponseCollector() );
conn.flush();
return cursor;
return result;
}
catch ( Neo4jException e )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.Session;

abstract class BaseDriver implements Driver
import static java.lang.String.format;

public class InternalDriver implements Driver
{
private final static String DRIVER_LOG_NAME = "Driver";

private final SecurityPlan securityPlan;
protected final SessionFactory sessionFactory;
protected final Logger log;
private final SessionFactory sessionFactory;
private final Logger log;

private AtomicBoolean closed = new AtomicBoolean( false );

BaseDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
InternalDriver( SecurityPlan securityPlan, SessionFactory sessionFactory, Logging logging )
{
this.securityPlan = securityPlan;
this.sessionFactory = sessionFactory;
Expand All @@ -61,8 +63,8 @@ public final Session session()
public final Session session( AccessMode mode )
{
assertOpen();
Session session = newSessionWithMode( mode );
if( closed.get() )
Session session = sessionFactory.newInstance( mode );
if ( closed.get() )
{
// the driver is already closed and we either 1. obtain this session from the old session pool
// or 2. we obtain this session from a new session pool
Expand All @@ -77,15 +79,35 @@ public final Session session( AccessMode mode )
@Override
public final void close()
{
if ( closed.compareAndSet(false, true) )
if ( closed.compareAndSet( false, true ) )
{
closeResources();
}
}

protected abstract Session newSessionWithMode( AccessMode mode );
/**
* Get the underlying session factory.
* <p>
* <b>This method is only for testing</b>
*
* @return the session factory used by this driver.
*/
public final SessionFactory getSessionFactory()
{
return sessionFactory;
}

protected abstract void closeResources();
private void closeResources()
{
try
{
sessionFactory.close();
}
catch ( Exception ex )
{
log.error( format( "~~ [ERROR] %s", ex.getMessage() ), ex );
}
}

private void assertOpen()
{
Expand All @@ -95,7 +117,7 @@ private void assertOpen()
}
}

private IllegalStateException driverCloseException()
private static RuntimeException driverCloseException()
{
return new IllegalStateException( "This driver instance has already been closed" );
}
Expand Down
Loading