Skip to content

Commit fe971de

Browse files
committed
[SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions
### What changes were proposed in this pull request? This PR makes zero when io.connectionTimeout/connectionCreationTimeout is negative. Zero here means - connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for connection establishment - connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` is disabled. ### Why are the changes needed? 1. This PR fixes a bug when connectionCreationTimeout is 0, which means unlimited to netty, but ChannelFuture.await(0) fails directly and inappropriately. 2. This PR fixes a bug when connectionCreationTimeout is less than 0, which causes meaningless transport client reconnections and endless executor reconstructions ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new unit tests Closes #41785 from yaooqinn/SPARK-44241. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 38645fa) Signed-off-by: Kent Yao <yao@apache.org>
1 parent ad29290 commit fe971de

File tree

3 files changed

+44
-9
lines changed

3 files changed

+44
-9
lines changed

common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,13 @@ TransportClient createClient(InetSocketAddress address)
245245
logger.debug("Creating new connection to {}", address);
246246

247247
Bootstrap bootstrap = new Bootstrap();
248+
int connCreateTimeout = conf.connectionCreationTimeoutMs();
248249
bootstrap.group(workerGroup)
249250
.channel(socketChannelClass)
250251
// Disable Nagle's Algorithm since we don't want packets to wait
251252
.option(ChannelOption.TCP_NODELAY, true)
252253
.option(ChannelOption.SO_KEEPALIVE, true)
253-
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionCreationTimeoutMs())
254+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
254255
.option(ChannelOption.ALLOCATOR, pooledAllocator);
255256

256257
if (conf.receiveBuf() > 0) {
@@ -276,10 +277,19 @@ public void initChannel(SocketChannel ch) {
276277
// Connect to the remote server
277278
long preConnect = System.nanoTime();
278279
ChannelFuture cf = bootstrap.connect(address);
279-
if (!cf.await(conf.connectionCreationTimeoutMs())) {
280+
281+
if (connCreateTimeout <= 0) {
282+
cf.awaitUninterruptibly();
283+
assert cf.isDone();
284+
if (cf.isCancelled()) {
285+
throw new IOException(String.format("Connecting to %s cancelled", address));
286+
} else if (!cf.isSuccess()) {
287+
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
288+
}
289+
} else if (!cf.await(connCreateTimeout)) {
280290
throw new IOException(
281291
String.format("Connecting to %s timed out (%s ms)",
282-
address, conf.connectionCreationTimeoutMs()));
292+
address, connCreateTimeout));
283293
} else if (cf.cause() != null) {
284294
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
285295
}

common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ public int connectionTimeoutMs() {
103103
conf.get("spark.network.timeout", "120s"));
104104
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
105105
conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
106-
return (int) defaultTimeoutMs;
106+
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
107107
}
108108

109109
/** Connect creation timeout in milliseconds. Default 30 secs. */
110110
public int connectionCreationTimeoutMs() {
111111
long connectionTimeoutS = TimeUnit.MILLISECONDS.toSeconds(connectionTimeoutMs());
112112
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
113113
conf.get(SPARK_NETWORK_IO_CONNECTIONCREATIONTIMEOUT_KEY, connectionTimeoutS + "s")) * 1000;
114-
return (int) defaultTimeoutMs;
114+
return defaultTimeoutMs < 0 ? 0 : (int) defaultTimeoutMs;
115115
}
116116

117117
/** Number of concurrent connections between two nodes for fetching data. */

common/network-common/src/test/java/org/apache/spark/network/client/TransportClientFactorySuite.java

+29-4
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@
3131
import org.junit.Before;
3232
import org.junit.Test;
3333

34-
import static org.junit.Assert.assertFalse;
35-
import static org.junit.Assert.assertNotSame;
36-
import static org.junit.Assert.assertTrue;
37-
3834
import org.apache.spark.network.TestUtils;
3935
import org.apache.spark.network.TransportContext;
4036
import org.apache.spark.network.server.NoOpRpcHandler;
@@ -45,6 +41,8 @@
4541
import org.apache.spark.network.util.JavaUtils;
4642
import org.apache.spark.network.util.TransportConf;
4743

44+
import static org.junit.Assert.*;
45+
4846
public class TransportClientFactorySuite {
4947
private TransportConf conf;
5048
private TransportContext context;
@@ -237,4 +235,31 @@ public void fastFailConnectionInTimeWindow() {
237235
Assert.assertThrows("fail this connection directly", IOException.class,
238236
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
239237
}
238+
239+
@Test
240+
public void unlimitedConnectionAndCreationTimeouts() throws IOException, InterruptedException {
241+
Map<String, String> configMap = new HashMap<>();
242+
configMap.put("spark.shuffle.io.connectionTimeout", "-1");
243+
configMap.put("spark.shuffle.io.connectionCreationTimeout", "-1");
244+
TransportConf conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
245+
RpcHandler rpcHandler = new NoOpRpcHandler();
246+
try (TransportContext ctx = new TransportContext(conf, rpcHandler, true);
247+
TransportClientFactory factory = ctx.createClientFactory()){
248+
TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
249+
assertTrue(c1.isActive());
250+
long expiredTime = System.currentTimeMillis() + 5000;
251+
while (c1.isActive() && System.currentTimeMillis() < expiredTime) {
252+
Thread.sleep(10);
253+
}
254+
assertTrue(c1.isActive());
255+
// When connectionCreationTimeout is unlimited, the connection shall be able to
256+
// fail when the server is not reachable.
257+
TransportServer server = ctx.createServer();
258+
int unreachablePort = server.getPort();
259+
JavaUtils.closeQuietly(server);
260+
IOException exception = Assert.assertThrows(IOException.class,
261+
() -> factory.createClient(TestUtils.getLocalHost(), unreachablePort, true));
262+
assertNotEquals(exception.getCause(), null);
263+
}
264+
}
240265
}

0 commit comments

Comments
 (0)