From c860dd3228ffb45d07edcece2c2a2681f9be838f Mon Sep 17 00:00:00 2001 From: Evgeny Chugunnyy Date: Fri, 22 Dec 2023 15:34:56 +0300 Subject: [PATCH] Probably better fix for conda s3 bug. `action` must be in async, since it reads from Stream. --- .../asto/streams/StorageValuePipeline.java | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/asto-core/src/main/java/com/artipie/asto/streams/StorageValuePipeline.java b/asto-core/src/main/java/com/artipie/asto/streams/StorageValuePipeline.java index a316a933..099d7f22 100644 --- a/asto-core/src/main/java/com/artipie/asto/streams/StorageValuePipeline.java +++ b/asto-core/src/main/java/com/artipie/asto/streams/StorageValuePipeline.java @@ -9,7 +9,6 @@ import com.artipie.asto.Content; import com.artipie.asto.Key; import com.artipie.asto.Storage; -import com.artipie.asto.misc.UncheckedIOConsumer; import com.artipie.asto.misc.UncheckedIOSupplier; import com.artipie.asto.misc.UncheckedRunnable; import io.reactivex.Flowable; @@ -33,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -113,6 +113,7 @@ public CompletionStage process( * output stream. * @return Completion action with the result * @throws ArtipieIOException On Error + * @checkstyle ExecutableStatementCountCheck (100 lines) */ public CompletionStage processWithResult( final BiFunction, OutputStream, R> action @@ -135,18 +136,45 @@ public CompletionStage processWithResult( } return stage; } - ).thenCompose( - optional -> { - try (PublishingOutputStream output = new PublishingOutputStream()) { - res.set(action.apply(optional, output)); - return this.asto.save(this.write, new Content.From(output.publisher())); - } catch (final IOException err) { - throw new ArtipieIOException(err); - } finally { - optional.ifPresent(new UncheckedIOConsumer<>(InputStream::close)); - } + ) + .thenCompose( + input -> { + final PublishingOutputStream output = new PublishingOutputStream(); + return CompletableFuture.runAsync(() -> res.set(action.apply(input, output))) + .thenApply(unused -> new ImmutablePair<>(input, output)); } - ).thenApply(nothing -> res.get()); + ) + .thenCompose( + streams -> this.asto.save( + this.write, new Content.From(streams.getRight().publisher()) + ).thenApply(unused -> streams) + ) + .handle( + (streams, throwable) -> { + Throwable last = throwable; + try { + if (streams.getLeft().isPresent()) { + streams.getLeft().get().close(); + } + } catch (final IOException ex) { + if (last != null) { + ex.addSuppressed(last); + } + last = ex; + } + try { + streams.getRight().close(); + } catch (final IOException ex) { + if (last != null) { + ex.addSuppressed(last); + } + last = ex; + } + if (last != null) { + throw new ArtipieIOException(last); + } + return res.get(); + }); } /**