Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix sftp connection shuffle #1757

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ public class ConnectionHandler implements LocalEntryUndeployCallBack {
// defined as <connector_name>:<connection_name>
private final Map<String, Object> connectionMap;
private final Map<String, String> connectionLocalEntryMap;
private final Map<String, ConnectionFactory> connectionFactoryMap;
private final Map<String, Configuration> configurationMap;
private final ConcurrentHashMap<String, LocalEntryUndeployObserver> observerMap = new ConcurrentHashMap();
private SynapseConfiguration synapseConfiguration = null;
private ConnectionFactory connectionFactory = null;
private Configuration configuration = null;

private ReentrantLock lock = new ReentrantLock();
private ReentrantLock poolLock = new ReentrantLock();

static {
handler = new ConnectionHandler();
Expand All @@ -63,6 +64,8 @@ public class ConnectionHandler implements LocalEntryUndeployCallBack {
private ConnectionHandler() {
this.connectionMap = new ConcurrentHashMap<>();
this.connectionLocalEntryMap = new ConcurrentHashMap<>();
this.connectionFactoryMap = new ConcurrentHashMap<>();
this.configurationMap = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -124,10 +127,25 @@ public void onLocalEntryUndeploy(String localEntryKey) {
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration, MessageContext messageContext) {
initializeLocalEntryConnectionMapping(connector, connectionName, messageContext);
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
connectionMap.putIfAbsent(getCode(connector, connectionName), pool);
configurationMap.putIfAbsent(getCode(connector, connectionName), configuration);
connectionFactoryMap.putIfAbsent(getCode(connector, connectionName), factory);
String key = getCode(connector, connectionName);
ConnectionPool pool = (ConnectionPool) connectionMap.get(key);

// Double-checked locking for thread safety
if (pool == null) {
poolLock.lock();
try {
pool = (ConnectionPool) connectionMap.get(key); // Second check (inside lock)
if (pool == null) {
log.info("Creating connection pool for " + connectionName);
pool = new ConnectionPool(factory, configuration);
connectionMap.putIfAbsent(key, pool);
}
} finally {
poolLock.unlock(); // Always release lock
}
}
}

/**
Expand All @@ -154,10 +172,25 @@ public void createConnection(String connector, String connectionName, Connection
*/
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration) {
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
connectionMap.putIfAbsent(getCode(connector, connectionName), pool);
configurationMap.putIfAbsent(getCode(connector, connectionName), configuration);
connectionFactoryMap.putIfAbsent(getCode(connector, connectionName), factory);
String key = getCode(connector, connectionName);
ConnectionPool pool = (ConnectionPool) connectionMap.get(key);

// Double-checked locking for thread safety
if (pool == null) {
poolLock.lock();
try {
pool = (ConnectionPool) connectionMap.get(key); // Second check (inside lock)
if (pool == null) {
log.info("Creating connection pool for " + connectionName);
pool = new ConnectionPool(factory, configuration);
connectionMap.putIfAbsent(key, pool);
}
} finally {
poolLock.unlock(); // Always release lock
}
}
}

/**
Expand Down Expand Up @@ -189,7 +222,8 @@ public Connection getConnection(String connector, String connectionName) throws
if (((ConnectionPool) connectionObj).isAgedTimeoutEnabled()) {
closeAgedConnectionPoolGracefully(connectorCode);
if (!connectionMap.containsKey(connectorCode)) {
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
ConnectionPool pool = new ConnectionPool(connectionFactoryMap.get(connectorCode),
configurationMap.get(connectorCode));
connectionMap.putIfAbsent(connectorCode, pool);
}
}
Expand Down