Skip to content

Commit

Permalink
Merge pull request #1757 from malakaganga/fix_sftp_conswap
Browse files Browse the repository at this point in the history
Fix sftp connection shuffle
  • Loading branch information
malakaganga authored Feb 4, 2025
2 parents 9b8d626 + f5444c4 commit d1cbb3a
Showing 1 changed file with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public class ConnectionHandler implements LocalEntryUndeployCallBack {
// defined as <connector_name>:<connection_name>
private final Map<String, Object> connectionMap;
private final Map<String, String> connectionLocalEntryMap;
private final Map<String, ConnectionFactory> connectionFactoryMap;
private final Map<String, Configuration> configurationMap;
private final ConcurrentHashMap<String, LocalEntryUndeployObserver> observerMap = new ConcurrentHashMap();
private SynapseConfiguration synapseConfiguration = null;
private ConnectionFactory connectionFactory = null;
private Configuration configuration = null;

private ReentrantLock lock = new ReentrantLock();
private ReentrantLock poolLock = new ReentrantLock();

static {
handler = new ConnectionHandler();
Expand All @@ -63,6 +64,8 @@ public class ConnectionHandler implements LocalEntryUndeployCallBack {
private ConnectionHandler() {
this.connectionMap = new ConcurrentHashMap<>();
this.connectionLocalEntryMap = new ConcurrentHashMap<>();
this.connectionFactoryMap = new ConcurrentHashMap<>();
this.configurationMap = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -124,10 +127,25 @@ public void onLocalEntryUndeploy(String localEntryKey) {
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration, MessageContext messageContext) {
initializeLocalEntryConnectionMapping(connector, connectionName, messageContext);
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
connectionMap.putIfAbsent(getCode(connector, connectionName), pool);
configurationMap.putIfAbsent(getCode(connector, connectionName), configuration);
connectionFactoryMap.putIfAbsent(getCode(connector, connectionName), factory);
String key = getCode(connector, connectionName);
ConnectionPool pool = (ConnectionPool) connectionMap.get(key);

// Double-checked locking for thread safety
if (pool == null) {
poolLock.lock();
try {
pool = (ConnectionPool) connectionMap.get(key); // Second check (inside lock)
if (pool == null) {
log.info("Creating connection pool for " + connectionName);
pool = new ConnectionPool(factory, configuration);
connectionMap.putIfAbsent(key, pool);
}
} finally {
poolLock.unlock(); // Always release lock
}
}
}

/**
Expand All @@ -154,10 +172,25 @@ public void createConnection(String connector, String connectionName, Connection
*/
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration) {
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
connectionMap.putIfAbsent(getCode(connector, connectionName), pool);
configurationMap.putIfAbsent(getCode(connector, connectionName), configuration);
connectionFactoryMap.putIfAbsent(getCode(connector, connectionName), factory);
String key = getCode(connector, connectionName);
ConnectionPool pool = (ConnectionPool) connectionMap.get(key);

// Double-checked locking for thread safety
if (pool == null) {
poolLock.lock();
try {
pool = (ConnectionPool) connectionMap.get(key); // Second check (inside lock)
if (pool == null) {
log.info("Creating connection pool for " + connectionName);
pool = new ConnectionPool(factory, configuration);
connectionMap.putIfAbsent(key, pool);
}
} finally {
poolLock.unlock(); // Always release lock
}
}
}

/**
Expand Down Expand Up @@ -189,7 +222,8 @@ public Connection getConnection(String connector, String connectionName) throws
if (((ConnectionPool) connectionObj).isAgedTimeoutEnabled()) {
closeAgedConnectionPoolGracefully(connectorCode);
if (!connectionMap.containsKey(connectorCode)) {
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
ConnectionPool pool = new ConnectionPool(connectionFactoryMap.get(connectorCode),
configurationMap.get(connectorCode));
connectionMap.putIfAbsent(connectorCode, pool);
}
}
Expand Down

0 comments on commit d1cbb3a

Please # to comment.