Skip to content

Commit

Permalink
inParallel() throws InterruptedException
Browse files Browse the repository at this point in the history
  • Loading branch information
fluentfuture committed Dec 22, 2024
1 parent 42e217a commit ebfdb2c
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
42 changes: 35 additions & 7 deletions mug/src/main/java/com/google/mu/util/concurrent/Parallelizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -435,18 +436,20 @@ public void parallelizeUninterruptibly(Stream<? extends Runnable> tasks) {
}

/**
* Returns a {@link Collector} that runs {@code concurrentFunction} in parallel using this {@code
* Parallelizer} and returns the inputs and outputs in a {@link BiStream}, in encounter order of
* the input elements.
* Applies {@code concurrentFunction} on each element of {@code inputs} in parallel, using this
* {@code Parallelizer} and returns the inputs and outputs in a {@link BiStream}, in encounter
* order of the input elements.
*
* <p>For example: <pre>{@code
* <p>For example:
*
* <pre>{@code
* ImmutableListMultimap<String, Asset> resourceAssets =
* resources.stream()
* .collect(parallelizer.inParallel(this::listAssets))
* parallelizer.inParallel(resources, this::listAssets)
* .collect(flatteningToImmutableListMultimap(List::stream));
* }</pre>
*
* <p>In Java 20 using structured concurrency, it can be implemented equivalently as in:
* <p>In Java 22 using structured concurrency, it can be implemented equivalently as in:
*
* <pre>{@code
* ImmutableListMultimap<String, Asset> resourceAssets;
* try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Expand All @@ -464,6 +467,31 @@ public void parallelizeUninterruptibly(Stream<? extends Runnable> tasks) {
*
* @param concurrentFunction a function that's safe to be run concurrently, and is usually
* IO-intensive (such as an outgoing RPC or reading distributed storage).
* @throws InterruptedException if the thread is interrupted while blocking for parallel results.
*
* @since 8.3
*/
public <I, O> BiStream<I, O> inParallel(
Collection<? extends I> inputs, Function<? super I, ? extends O> concurrentFunction)
throws InterruptedException {
List<I> list = new ArrayList<>(inputs);
requireNonNull(concurrentFunction);
List<O> outputs = new ArrayList<>(list.size());
outputs.addAll(Collections.nCopies(list.size(), (O) null));
parallelize(
IntStream.range(0, list.size()).boxed(),
i -> outputs.set(i, concurrentFunction.apply(list.get(i))));
return BiStream.zip(list, outputs);
}

/**
* Returns a {@link Collector} that runs {@code concurrentFunction} in parallel using this {@code
* Parallelizer} and returns the inputs and outputs in a {@link BiStream}, in encounter order of
* the input elements.
*
* @deprecated Use {@link #inParallel(Iterable, Function)} instead. This method doesn't propagate
* cancellation from the main thread to the concurrent threads, thus isn't composable in a
* structured concurrency tree of concurrent operations.
*
* @since 6.5
*/
Expand Down
13 changes: 6 additions & 7 deletions mug/src/test/java/com/google/mu/util/concurrent/FanoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import static org.junit.Assert.assertThrows;

import java.util.Map;
import java.util.stream.Stream;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -212,23 +211,23 @@ public void withMaxConcurrency_negativeConcurrencyDisallowed() {
}

@Test
public void withMaxConcurrency_inputSizeGreaterThanMaxConcurrency() {
public void withMaxConcurrency_inputSizeGreaterThanMaxConcurrency() throws InterruptedException {
Map<Integer, String> results =
Stream.of(1, 2, 3, 4, 5).collect(withMaxConcurrency(3).inParallel(Object::toString)).toMap();
withMaxConcurrency(3).inParallel(asList(1, 2, 3, 4, 5), Object::toString).toMap();
assertThat(results).containsExactly(1, "1", 2, "2", 3, "3", 4, "4", 5, "5").inOrder();
}

@Test
public void withMaxConcurrency_inputSizeSmallerThanMaxConcurrency() {
public void withMaxConcurrency_inputSizeSmallerThanMaxConcurrency() throws InterruptedException {
Map<Integer, String> results =
Stream.of(1, 2).collect(withMaxConcurrency(3).inParallel(Object::toString)).toMap();
withMaxConcurrency(3).inParallel(asList(1, 2), Object::toString).toMap();
assertThat(results).containsExactly(1, "1", 2, "2").inOrder();
}

@Test
public void withMaxConcurrency_inputSizeEqualToMaxConcurrency() {
public void withMaxConcurrency_inputSizeEqualToMaxConcurrency() throws InterruptedException {
Map<Integer, String> results =
Stream.of(1, 2, 3).collect(withMaxConcurrency(3).inParallel(Object::toString)).toMap();
withMaxConcurrency(3).inParallel(asList(1, 2, 3), Object::toString).toMap();
assertThat(results).containsExactly(1, "1", 2, "2", 3, "3").inOrder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,15 @@ public static class CollectorTest {
threadPool.shutdownNow();
}

@Test public void testInParallel_emptyInputStream() {
@Test public void testInParallel_emptyInputStream() throws InterruptedException{
Parallelizer parallelizer = new Parallelizer(threadPool, 3);
assertThat(
Stream.empty()
.collect(parallelizer.inParallel(Object::toString))
.toMap())
assertThat(parallelizer.inParallel(asList(), Object::toString).toMap())
.isEmpty();
}

@Test public void testInParallel_fromSequentialStream() {
@Test public void testInParallel_fromSequentialStream() throws InterruptedException {
Parallelizer parallelizer = new Parallelizer(threadPool, 3);
assertThat(
Stream.of(1, 2, 3)
.collect(parallelizer.inParallel(Object::toString))
.toMap())
assertThat(parallelizer.inParallel(asList(1, 2, 3), Object::toString).toMap())
.containsExactly(1, "1", 2, "2", 3, "3")
.inOrder();
}
Expand All @@ -105,16 +99,16 @@ public static class CollectorTest {
.inOrder();
}

@Test public void testInParallel_failure() {
@Test public void testInParallel_failure() throws InterruptedException {
Parallelizer parallelizer = new Parallelizer(threadPool, 3);
RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> Stream.of(1, 2, 3)
.collect(
parallelizer.inParallel(i -> {
() -> parallelizer.inParallel(
asList(1, 2, 3),
i -> {
Preconditions.checkState(i < 3);
return i.toString();
})));
}));
assertThat(thrown).hasCauseThat().isInstanceOf(IllegalStateException.class);
}

Expand Down

0 comments on commit ebfdb2c

Please # to comment.