Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix retrying of SocketTimeoutExceptions in HttpConnector #9008

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ URLConnection connect(
throw e;
} catch (IllegalArgumentException e) {
throw new UnrecoverableHttpException(e.getMessage());
} catch (SocketTimeoutException e) {
// SocketTimeoutExceptions are InterruptedExceptions; however they do not signify
// an external interruption, but simply a failed download due to some server timing
// out. So rethrow them as ordinary IOExceptions.
throw new IOException(e.getMessage(), e);
} catch (IOException e) {
if (connection != null) {
// If we got here, it means we might not have consumed the entire payload of the
Expand All @@ -214,7 +209,12 @@ URLConnection connect(
throw e;
}
if (++retries == MAX_RETRIES) {
if (!(e instanceof SocketTimeoutException)) {
if (e instanceof SocketTimeoutException) {
// SocketTimeoutExceptions are InterruptedIOExceptions; however they do not signify
// an external interruption, but simply a failed download due to some server timing
// out. So rethrow them as ordinary IOExceptions.
e = new IOException(e.getMessage(), e);
} else {
eventHandler
.handle(Event.progress(format("Error connecting to %s: %s", url, e.getMessage())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.net.Proxy;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.net.URLConnection;
import java.util.Locale;
Expand All @@ -53,6 +54,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -79,10 +81,14 @@ public class HttpConnectorTest {
private final ExecutorService executor = Executors.newFixedThreadPool(2);
private final ManualClock clock = new ManualClock();
private final ManualSleeper sleeper = new ManualSleeper(clock);
/**
* Scale timeouts down to make tests fast.
*/
private final float timeoutScaling = 0.05f;
private final EventHandler eventHandler = mock(EventHandler.class);
private final ProxyHelper proxyHelper = mock(ProxyHelper.class);
private final HttpConnector connector =
new HttpConnector(Locale.US, eventHandler, proxyHelper, sleeper);
new HttpConnector(Locale.US, eventHandler, proxyHelper, sleeper, timeoutScaling);

@Before
public void before() throws Exception {
Expand Down Expand Up @@ -202,6 +208,148 @@ public Object call() throws Exception {
}
}

@Test
public void connectionRefused_retries() throws Exception {
final int port;

// Start and immediately stop server socket to get a free port.
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
port = server.getLocalPort();
}

final AtomicReference<ServerSocket> server = new AtomicReference<>();

try {
// Schedule server socket to be started only after retry to simulate connection retry.
sleeper.scheduleRunnable(() -> {
try {
server.set(new ServerSocket(port, 1, InetAddress.getByName(null)));
} catch (IOException e) {
throw new RuntimeException(e);
}

@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(() -> {

while (!executor.isShutdown()) {
try (Socket socket = server.get().accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 200 OK",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 5",
"",
"hello");
}
}

return null;
});
}, 1);

try (Reader payload =
new InputStreamReader(
connector.connect(
new URL(String.format("http://localhost:%d", port)),
ImmutableMap.<String, String>of())
.getInputStream(),
ISO_8859_1)) {
assertThat(CharStreams.toString(payload)).isEqualTo("hello");
}
} finally {
ServerSocket serverSocket = server.get();

if (serverSocket != null) {
serverSocket.close();
}
}
}

@Test
public void socketTimeout_retries() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(() -> {
try (Socket socket = server.accept()) {
// Do nothing to cause SocketTimeoutException on client side.
}

// Schedule proper HTTP response once client retries.
sleeper.scheduleRunnable(() -> {
executor.submit(() -> {
while (!executor.isShutdown()) {
try (Socket socket = server.accept()) {
readHttpRequest(socket.getInputStream());
sendLines(
socket,
"HTTP/1.1 200 OK",
"Date: Fri, 31 Dec 1999 23:59:59 GMT",
"Connection: close",
"Content-Type: text/plain",
"Content-Length: 5",
"",
"hello");
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});

}, 1);
return null;
});

try (Reader payload =
new InputStreamReader(
connector.connect(
new URL(String.format("http://localhost:%d", server.getLocalPort())),
ImmutableMap.<String, String>of())
.getInputStream(),
ISO_8859_1)) {
assertThat(CharStreams.toString(payload)).isEqualTo("hello");
assertThat(clock.currentTimeMillis()).isEqualTo(1);
}
}
}

/**
* It is important part of {@link HttpConnector} contract to not throw raw {@link SocketTimeoutException}
* because it extends {@link java.io.InterruptedIOException}
* and {@link HttpConnectorMultiplexer} relies on {@link java.io.InterruptedIOException}
* to only be thrown when actual interruption happened.
*/
@Test
public void socketTimeout_throwsIOExceptionInsteadOfSocketTimeoutException() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
executor.submit(() -> {
try (Socket socket = server.accept()) {
// Do nothing to cause SocketTimeoutException on client side.
}
return null;
});

try (Reader payload =
new InputStreamReader(
connector.connect(
new URL(String.format("http://localhost:%d", server.getLocalPort())),
ImmutableMap.<String, String>of())
.getInputStream(),
ISO_8859_1)) {
fail("Should have thrown");
} catch (IOException expected) {
assertThat(expected).hasCauseThat().isInstanceOf(SocketTimeoutException.class);
assertThat(expected).hasCauseThat().hasMessageThat().isEqualTo("connect timed out");
}
}
}

@Test
public void permanentError_doesNotRetryAndThrowsIOException() throws Exception {
try (ServerSocket server = new ServerSocket(0, 1, InetAddress.getByName(null))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.Sleeper;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/** Fake sleeper for testing. */
/**
* Fake sleeper for testing.
*/
public final class ManualSleeper implements Sleeper {

private final ManualClock clock;
private final List<Pair<Long, Runnable>> scheduledRunnables = new ArrayList<>(0);

public ManualSleeper(ManualClock clock) {
this.clock = checkNotNull(clock);
Expand All @@ -31,6 +38,31 @@ public ManualSleeper(ManualClock clock) {
@Override
public void sleepMillis(long milliseconds) throws InterruptedException {
checkArgument(milliseconds >= 0, "sleeper can't time travel");
clock.advanceMillis(milliseconds);
final long resultedCurrentTimeMillis = clock.advanceMillis(milliseconds);

Iterator<Pair<Long, Runnable>> iterator = scheduledRunnables.iterator();

// Run those scheduled Runnables who's time has come.
while (iterator.hasNext()) {
Pair<Long, Runnable> scheduledRunnable = iterator.next();

if (resultedCurrentTimeMillis >= scheduledRunnable.first) {
iterator.remove();
scheduledRunnable.second.run();
}
}
}

/**
* Schedules a given {@link Runnable} to run when this Sleeper's clock has been adjusted with
* {@link #sleepMillis(long)} by {@code delayMilliseconds} or greater.
*
* @param runnable runnable to run, must not throw exceptions.
* @param delayMilliseconds delay in milliseconds from current value of {@link ManualClock} used
* by this {@link ManualSleeper}.
*/
public void scheduleRunnable(Runnable runnable, long delayMilliseconds) {
checkArgument(delayMilliseconds >= 0, "sleeper can't time travel");
scheduledRunnables.add(new Pair<>(clock.currentTimeMillis() + delayMilliseconds, runnable));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.google.devtools.build.lib.testutil;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ManualSleeperTest {

private final ManualClock clock = new ManualClock();
private final ManualSleeper sleeper = new ManualSleeper(clock);

@Test
public void sleepMillis_0_ok() throws InterruptedException {
sleeper.sleepMillis(0);
assertThat(clock.currentTimeMillis()).isEqualTo(0);
}

@Test
public void sleepMillis_100_ok() throws InterruptedException {
sleeper.sleepMillis(100);
assertThat(clock.currentTimeMillis()).isEqualTo(100);
}

@Test
public void sleepMillis_minus1_throws() throws InterruptedException {
try {
sleeper.sleepMillis(-1);
fail("Should have thrown");
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().isEqualTo("sleeper can't time travel");
}
}

@Test
public void scheduleRunnable_0_doesNotRunItImmediately() {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 0);

assertThat(counter.get()).isEqualTo(0);
}

@Test
public void scheduleRunnable_100_doesNotRunItImmediately() {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);

assertThat(counter.get()).isEqualTo(0);
}

@Test
public void scheduleRunnable_minus1_throws() {
AtomicInteger counter = new AtomicInteger();

try {
sleeper.scheduleRunnable(counter::incrementAndGet, -1);
fail("Should have thrown");
} catch (IllegalArgumentException expected) {
assertThat(expected).hasMessageThat().isEqualTo("sleeper can't time travel");
assertThat(counter.get()).isEqualTo(0);
}
}

@Test
public void scheduleRunnable_0_runsAfterSleep0() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 0);

sleeper.sleepMillis(0);

assertThat(counter.get()).isEqualTo(1);
}

@Test
public void scheduleRunnable_0_runsAfterSleep0_doesNotRunSecondTime()
throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 0);

sleeper.sleepMillis(0);
sleeper.sleepMillis(0);

assertThat(counter.get()).isEqualTo(1);
}

@Test
public void scheduleRunnable_100_runsAfterSleepExactly100() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);

sleeper.sleepMillis(50);
assertThat(counter.get()).isEqualTo(0);

sleeper.sleepMillis(50);
assertThat(counter.get()).isEqualTo(1);
}

@Test
public void scheduleRunnable_100_runsAfterSleepOver100() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);

sleeper.sleepMillis(50);
assertThat(counter.get()).isEqualTo(0);

sleeper.sleepMillis(150);
assertThat(counter.get()).isEqualTo(1);
}

@Test
public void scheduleRunnable_100_doesNotRunAgain() throws InterruptedException {
AtomicInteger counter = new AtomicInteger();
sleeper.scheduleRunnable(counter::incrementAndGet, 100);

sleeper.sleepMillis(150);
assertThat(counter.get()).isEqualTo(1);

sleeper.sleepMillis(100);
assertThat(counter.get()).isEqualTo(1);
}
}