From 03af351b146f60d7bf31ef7b8343a8741964ac02 Mon Sep 17 00:00:00 2001 From: Oleksandr Krutko Date: Fri, 13 Sep 2024 06:08:01 +0300 Subject: [PATCH] Fixes #929 - Implement a utility class to save large downloads to a file. Introduced `PathResponseListener`, and related test cases. Signed-off-by: Oleksandr Krutko --- .../jetty/client/PathResponseListener.java | 139 ++++++ .../client/PathResponseListenerTest.java | 397 ++++++++++++++++++ 2 files changed, 536 insertions(+) create mode 100644 jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathResponseListener.java create mode 100644 jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/PathResponseListenerTest.java diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathResponseListener.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathResponseListener.java new file mode 100644 index 000000000000..d6c623ddbdfc --- /dev/null +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/PathResponseListener.java @@ -0,0 +1,139 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CompletableFuture; + +import org.eclipse.jetty.client.Response.Listener; +import org.eclipse.jetty.http.HttpStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of {@link Response.ContentListener} that produces an {@link Path} + * that allows applications to save a file from a response {@link Response} + * like {@code curl -o file.bin} does. + *

+ * Typical usage is: + *

{@code httpClient.newRequest(host, port)
+ * .send(new PathResponseListener(Path.of("/tmp/file.bin"));
+ * 
+ *  var request = httpClient.newRequest(host, port);
+ *  CompletableFuture completable = PathResponseListener.write(request, Path.of("/tmp/file.bin"), rewriteExistingFile);
+ *  }
+ */ +public class PathResponseListener extends CompletableFuture implements Listener +{ + private static final Logger LOG = LoggerFactory.getLogger(InputStreamResponseListener.class); + + private final Path path; + private final FileChannel fileChannel; + + public PathResponseListener(Path path, boolean overwrite) throws IOException + { + this.path = path; + + // Throws the exception if file can't be overwritten + // otherwise truncate it. + if (Files.exists(path) && !overwrite) + { + throw new FileAlreadyExistsException("File can't be overwritten"); + } + + fileChannel = FileChannel.open(this.path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); + } + + @Override + public void onHeaders(Response response) + { + if (response.getStatus() != HttpStatus.OK_200) + { + response.abort(new HttpResponseException(String.format("HTTP status code of response %d", response.getStatus()), response)); + } + } + + @Override + public void onContent(Response response, ByteBuffer content) + { + try + { + var bytesWritten = fileChannel.write(content); + if (LOG.isDebugEnabled()) + LOG.debug("%d bytes written", bytesWritten); + } + catch (IOException e) + { + response.abort(e); + } + } + + @Override + public void onComplete(Result result) + { + try + { + if (result.isFailed()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Result failure", result.getFailure()); + completeExceptionally(result.getFailure()); + return; + } + + this.complete(this.path); + } + finally + { + try + { + fileChannel.close(); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + } + + /** + * Writes a file into {@link Path}. + * + * @param request to a server + * @param path to write a file + * @param overwrite true overwrites a file, otherwise fails + * @return {@code CompletableFuture} + */ + public static CompletableFuture write(Request request, Path path, boolean overwrite) + { + PathResponseListener l = null; + try + { + l = new PathResponseListener(path, overwrite); + request.send(l); + } + catch (Throwable e) + { + l.completeExceptionally(e); + } + return l; + } +} diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/PathResponseListenerTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/PathResponseListenerTest.java new file mode 100644 index 000000000000..8ef3da78b973 --- /dev/null +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/PathResponseListenerTest.java @@ -0,0 +1,397 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.ResourceHandler; +import org.eclipse.jetty.util.resource.ResourceFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class PathResponseListenerTest +{ + private Server server; + private ServerConnector connector; + private MessageDigest origDigest; + private MessageDigest respDigest; + + private static final Path ORIGIN_ZERO_FILE = Path.of(System.getProperty("user.dir"), "origin_zero"); + private static final Path ORIGIN_SMALL_FILE = Path.of(System.getProperty("user.dir"), "origin_small"); + private static final Path ORIGIN_LARGE_FILE = Path.of(System.getProperty("user.dir"), "origin_large"); + + private static final Path RESPONSE_ZERO_FILE = Path.of(System.getProperty("user.dir"), "response_zero"); + private static final Path RESPONSE_SMALL_FILE = Path.of(System.getProperty("user.dir"), "response_small"); + private static final Path RESPONSE_LARGE_FILE = Path.of(System.getProperty("user.dir"), "response_large"); + + private void configureTestEnvironment() throws Exception + { + origDigest = MessageDigest.getInstance("MD5"); + respDigest = MessageDigest.getInstance("MD5"); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + ResourceHandler resourceHandler = new ResourceHandler(); + resourceHandler.setBaseResource(ResourceFactory.of(resourceHandler).newResource(System.getProperty("user.dir"))); + resourceHandler.setDirAllowed(false); + connector = new ServerConnector(server); + server.addConnector(connector); + server.setHandler(resourceHandler); + } + + private void deleteFiles(Path... paths) + { + for (Path p : paths) + { + try + { + Files.deleteIfExists(p); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + private void createZeroFile() throws IOException + { + try (OutputStream zeroFileWriter = Files.newOutputStream(ORIGIN_ZERO_FILE, StandardOpenOption.CREATE_NEW)) + { + zeroFileWriter.write(ByteBuffer.allocate(0).array()); + } + catch (IOException e) + { + throw e; + } + } + + private void createSmallFile() throws IOException + { + try (OutputStream smallFileWriter = Files.newOutputStream(ORIGIN_SMALL_FILE, StandardOpenOption.CREATE_NEW)) + { + ByteBuffer buff = ByteBuffer.allocate(1024); + Random random = new Random(); + int writeBytes = 0; + while (writeBytes < 1024) + { + random.nextBytes(buff.array()); + buff.flip(); + smallFileWriter.write(buff.array()); + buff.clear(); + writeBytes++; + } + } + catch (IOException e) + { + throw e; + } + } + + private void createLargeFile() throws IOException + { + try (OutputStream largeFileWriter = Files.newOutputStream(ORIGIN_LARGE_FILE, StandardOpenOption.CREATE_NEW)) + { + ByteBuffer buff = ByteBuffer.allocate(1048576); + Random random = new Random(); + int writeBytes = 0; + while (writeBytes < 1024) + { + random.nextBytes(buff.array()); + buff.flip(); + largeFileWriter.write(buff.array()); + buff.clear(); + writeBytes++; + } + } + catch (IOException e) + { + throw e; + } + } + + @BeforeEach + public void startServer() throws Exception + { + configureTestEnvironment(); + server.start(); + } + + @AfterEach + public void stopServer() throws Exception + { + server.stop(); + + // Reuse message digest + origDigest.reset(); + respDigest.reset(); + } + + @Test + public void testZeroFileDownload() throws Exception + { + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1))) + { + deleteFiles(ORIGIN_ZERO_FILE, RESPONSE_ZERO_FILE); + createZeroFile(); + + client.start(); + + URL url = new URL("http", "localhost", connector.getLocalPort(), "/" + ORIGIN_ZERO_FILE.getFileName().toString()); + + PathResponseListener listener = new PathResponseListener(RESPONSE_ZERO_FILE, true); + Request request = client.newRequest(url.toURI().toString()); + request.send(listener); + Path path = listener.get(600, TimeUnit.SECONDS); + + assertTrue(Files.exists(path)); + + File originFile = new File(ORIGIN_ZERO_FILE.toUri()); + File responseFile = new File(RESPONSE_ZERO_FILE.toUri()); + + assertTrue(originFile.exists() && responseFile.exists()); + assertTrue(originFile.length() == 0 && responseFile.length() == 0); + } + catch (Exception e) + { + throw e; + } + finally + { + deleteFiles(ORIGIN_ZERO_FILE, RESPONSE_ZERO_FILE); + } + } + + @Test + public void testSmallFileDownload() throws Exception + { + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1));) + { + deleteFiles(ORIGIN_SMALL_FILE, RESPONSE_SMALL_FILE); + createSmallFile(); + + client.start(); + + URL url = new URL("http", "localhost", connector.getLocalPort(), "/" + ORIGIN_SMALL_FILE.getFileName().toString()); + + PathResponseListener listener = new PathResponseListener(RESPONSE_SMALL_FILE, true); + Request request = client.newRequest(url.toURI().toString()); + request.send(listener); + Path path = listener.get(); + + assertTrue(Files.exists(path)); + + try (InputStream responseFile = Files.newInputStream(RESPONSE_SMALL_FILE, StandardOpenOption.READ); + InputStream originFile = Files.newInputStream(ORIGIN_SMALL_FILE, StandardOpenOption.READ)) + { + origDigest.update(originFile.readAllBytes()); + respDigest.update(responseFile.readAllBytes()); + + assertTrue(MessageDigest.isEqual(origDigest.digest(), respDigest.digest())); + } + } + catch (Exception e) + { + throw e; + } + finally + { + deleteFiles(ORIGIN_SMALL_FILE, RESPONSE_SMALL_FILE); + } + } + + @Test + public void testLargeFileDownload() throws Exception + { + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1));) + { + deleteFiles(ORIGIN_LARGE_FILE, RESPONSE_LARGE_FILE); + createLargeFile(); + + client.start(); + + URL url = new URL("http", "localhost", connector.getLocalPort(), "/" + ORIGIN_LARGE_FILE.getFileName().toString()); + + PathResponseListener listener = new PathResponseListener(RESPONSE_LARGE_FILE, true); + Request request = client.newRequest(url.toURI().toString()); + request.send(listener); + Path path = listener.get(); + + assertTrue(Files.exists(path)); + + try (InputStream responseFile = Files.newInputStream(RESPONSE_LARGE_FILE, StandardOpenOption.READ); + InputStream originFile = Files.newInputStream(ORIGIN_LARGE_FILE, StandardOpenOption.READ)) + { + origDigest.update(originFile.readAllBytes()); + respDigest.update(responseFile.readAllBytes()); + + assertTrue(MessageDigest.isEqual(origDigest.digest(), respDigest.digest())); + } + } + catch (Exception e) + { + throw e; + } + finally + { + deleteFiles(ORIGIN_LARGE_FILE, RESPONSE_LARGE_FILE); + } + } + + @Test + public void testZeroFileDownloadCompletable() throws Exception + { + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1));) + { + deleteFiles(ORIGIN_ZERO_FILE, RESPONSE_ZERO_FILE); + createZeroFile(); + + client.start(); + + URL url = new URL("http", "localhost", connector.getLocalPort(), "/" + ORIGIN_ZERO_FILE.getFileName().toString()); + + Request request = client.newRequest(url.toURI().toString()); + + CompletableFuture completable = PathResponseListener.write(request, RESPONSE_ZERO_FILE, true); + completable.thenAccept(path -> + { + try (InputStream responseFile = new BufferedInputStream(Files.newInputStream(path, StandardOpenOption.READ)); + InputStream originFile = new BufferedInputStream(Files.newInputStream(ORIGIN_ZERO_FILE, StandardOpenOption.READ))) + { + origDigest.update(originFile.readAllBytes()); + respDigest.update(responseFile.readAllBytes()); + + assertTrue(MessageDigest.isEqual(origDigest.digest(), respDigest.digest())); + } + catch (IOException e) + { + e.printStackTrace(); + } + }); + completable.get(); + } + catch (Exception e) + { + throw e; + } + finally + { + deleteFiles(ORIGIN_ZERO_FILE, RESPONSE_ZERO_FILE); + } + } + + @Test + public void testSmallFileDownloadCompletable() throws Exception + { + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1));) + { + deleteFiles(ORIGIN_SMALL_FILE, RESPONSE_SMALL_FILE); + createSmallFile(); + + client.start(); + + URL url = new URL("http", "localhost", connector.getLocalPort(), "/" + ORIGIN_SMALL_FILE.getFileName().toString()); + + Request request = client.newRequest(url.toURI().toString()); + CompletableFuture completable = PathResponseListener.write(request, RESPONSE_SMALL_FILE, true); + + completable.thenAccept(path -> + { + try (InputStream responseFile = new BufferedInputStream(Files.newInputStream(path, StandardOpenOption.READ)); + InputStream originFile = new BufferedInputStream(Files.newInputStream(ORIGIN_SMALL_FILE, StandardOpenOption.READ))) + { + origDigest.update(originFile.readAllBytes()); + respDigest.update(responseFile.readAllBytes()); + + assertTrue(MessageDigest.isEqual(origDigest.digest(), respDigest.digest())); + } + catch (IOException e) + { + e.printStackTrace(); + } + }); + completable.get(); + } + catch (Exception e) + { + throw e; + } + finally + { + deleteFiles(ORIGIN_SMALL_FILE, RESPONSE_SMALL_FILE); + } + } + + @Test + public void testLargeFileDownloadCompletable() throws Exception + { + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1));) + { + deleteFiles(ORIGIN_LARGE_FILE, RESPONSE_LARGE_FILE); + createLargeFile(); + + client.start(); + + URL url = new URL("http", "localhost", connector.getLocalPort(), "/" + ORIGIN_LARGE_FILE.getFileName().toString()); + + Request request = client.newRequest(url.toURI().toString()); + CompletableFuture completable = PathResponseListener.write(request, RESPONSE_LARGE_FILE, true); + + completable.thenAccept(path -> + { + try (BufferedInputStream responseFile = new BufferedInputStream(Files.newInputStream(path, StandardOpenOption.READ)); + BufferedInputStream originFile = new BufferedInputStream(Files.newInputStream(ORIGIN_LARGE_FILE, StandardOpenOption.READ));) + { + origDigest.update(originFile.readAllBytes()); + respDigest.update(responseFile.readAllBytes()); + + assertTrue(MessageDigest.isEqual(origDigest.digest(), respDigest.digest())); + } + catch (IOException e) + { + e.printStackTrace(); + } + }); + completable.get(); + } + catch (Exception e) + { + throw e; + } + finally + { + deleteFiles(ORIGIN_LARGE_FILE, RESPONSE_LARGE_FILE); + } + } +}