Skip to content

Commit

Permalink
[MRESOLVER-321] Make collection and visiting interruptable (#380)
Browse files Browse the repository at this point in the history
Currently the two collector implementations are not interrupt-able, do not sense interruption directly, only by some side-effect like IO. Moreover, the BF new collector may enter a "busy loop" as we seen (it was due bug, but nothing prevents us to have more bugs). Make main loop in both collector detect thread interruption and use global (per-collection) Args to carry state of the, interruption effectively the whole ST or MT collection.

Same stands for dependency visitor: it may also enter busy loop in case of cycles, hence, it should be made also  interrupt-able. Visitor OTOH should not reset the interrupted flag, it should just stop when it is set, and let the flag be handled at higher level (for example in collector).

---

https://issues.apache.org/jira/browse/MRESOLVER-321
  • Loading branch information
cstamas authored Nov 27, 2023
1 parent b6bb29c commit 4801d20
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ public void setData(Object key, Object value) {
}

public boolean accept(DependencyVisitor visitor) {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException(new InterruptedException("Thread interrupted"));
}
if (visitor.visitEnter(this)) {
for (DependencyNode child : children) {
if (!child.accept(visitor)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.eclipse.aether.graph;

import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.aether.artifact.DefaultArtifact;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
*/
public class DefaultDependencyNodeTest {
@Test
void testVisitorInterrupt() throws Exception {
DefaultDependencyNode node =
new DefaultDependencyNode(new Dependency(new DefaultArtifact("gid:aid:ver"), "compile"));
// we just use dummy visitor, as it is not visiting that matters
DependencyVisitor visitor = new DependencyVisitor() {
@Override
public boolean visitEnter(DependencyNode node) {
return true;
}

@Override
public boolean visitLeave(DependencyNode node) {
return true;
}
};
AtomicReference<Exception> thrown = new AtomicReference<>(null);
Thread t = new Thread(() -> {
Thread.currentThread().interrupt();
try {
node.accept(visitor);
fail("Should fail");
} catch (Exception e) {
thrown.set(e);
}
});
t.start();
t.join();

assertTrue(thrown.get() instanceof RuntimeException, String.valueOf(thrown.get()));
assertTrue(thrown.get().getCause() instanceof InterruptedException, String.valueOf(thrown.get()));
assertTrue(t.isInterrupted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ protected abstract void doCollectDependencies(
List<RemoteRepository> repositories,
List<Dependency> dependencies,
List<Dependency> managedDependencies,
Results results);
Results results)
throws DependencyCollectionException;

protected RepositorySystemSession optimizeSession(RepositorySystemSession session) {
DefaultRepositorySystemSession optimized = new DefaultRepositorySystemSession(session);
Expand Down Expand Up @@ -461,6 +462,10 @@ public Results(CollectResult result, RepositorySystemSession session) {
maxCycles = ConfigUtils.getInteger(session, DEFAULT_MAX_CYCLES, CONFIG_PROP_MAX_CYCLES);
}

public CollectResult getResult() {
return result;
}

public String getErrorPath() {
return errorPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -48,6 +49,7 @@
import org.eclipse.aether.artifact.ArtifactType;
import org.eclipse.aether.artifact.DefaultArtifact;
import org.eclipse.aether.collection.CollectRequest;
import org.eclipse.aether.collection.DependencyCollectionException;
import org.eclipse.aether.collection.DependencyManager;
import org.eclipse.aether.collection.DependencySelector;
import org.eclipse.aether.collection.DependencyTraverser;
Expand Down Expand Up @@ -146,7 +148,8 @@ protected void doCollectDependencies(
List<RemoteRepository> repositories,
List<Dependency> dependencies,
List<Dependency> managedDependencies,
Results results) {
Results results)
throws DependencyCollectionException {
boolean useSkip = ConfigUtils.getBoolean(session, DEFAULT_SKIPPER, CONFIG_PROP_SKIPPER);
int nThreads = ExecutorUtils.threadCount(session, DEFAULT_THREADS, CONFIG_PROP_THREADS);
logger.debug("Using thread pool with {} threads to resolve descriptors.", nThreads);
Expand Down Expand Up @@ -200,6 +203,11 @@ protected void doCollectDependencies(
processDependency(
args, results, args.dependencyProcessingQueue.remove(), Collections.emptyList(), false);
}

if (args.interruptedException.get() != null) {
throw new DependencyCollectionException(
results.getResult(), "Collection interrupted", args.interruptedException.get());
}
}
}

Expand All @@ -210,6 +218,12 @@ private void processDependency(
DependencyProcessingContext context,
List<Artifact> relocations,
boolean disableVersionManagement) {
if (Thread.interrupted()) {
args.interruptedException.set(new InterruptedException());
}
if (args.interruptedException.get() != null) {
return;
}
Dependency dependency = context.dependency;
PremanagedDependency preManaged = context.premanagedDependency;

Expand Down Expand Up @@ -603,6 +617,8 @@ static class Args {

final ParallelDescriptorResolver resolver;

final AtomicReference<InterruptedException> interruptedException;

Args(
RepositorySystemSession session,
DataPool pool,
Expand All @@ -620,6 +636,7 @@ static class Args {
this.versionContext = versionContext;
this.skipper = skipper;
this.resolver = resolver;
this.interruptedException = new AtomicReference<>(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.aether.RepositorySystemSession;
import org.eclipse.aether.RequestTrace;
import org.eclipse.aether.artifact.Artifact;
import org.eclipse.aether.collection.CollectRequest;
import org.eclipse.aether.collection.DependencyCollectionException;
import org.eclipse.aether.collection.DependencyManager;
import org.eclipse.aether.collection.DependencySelector;
import org.eclipse.aether.collection.DependencyTraverser;
Expand Down Expand Up @@ -88,7 +90,8 @@ protected void doCollectDependencies(
List<RemoteRepository> repositories,
List<Dependency> dependencies,
List<Dependency> managedDependencies,
Results results) {
Results results)
throws DependencyCollectionException {
NodeStack nodes = new NodeStack();
nodes.push(node);

Expand All @@ -110,6 +113,11 @@ protected void doCollectDependencies(
? session.getDependencyTraverser().deriveChildTraverser(context)
: null,
session.getVersionFilter() != null ? session.getVersionFilter().deriveChildFilter(context) : null);

if (args.interruptedException.get() != null) {
throw new DependencyCollectionException(
results.getResult(), "Collection interrupted", args.interruptedException.get());
}
}

@SuppressWarnings("checkstyle:parameternumber")
Expand All @@ -123,6 +131,12 @@ private void process(
DependencyManager depManager,
DependencyTraverser depTraverser,
VersionFilter verFilter) {
if (Thread.interrupted()) {
args.interruptedException.set(new InterruptedException());
}
if (args.interruptedException.get() != null) {
return;
}
for (Dependency dependency : dependencies) {
processDependency(
args, trace, results, repositories, depSelector, depManager, depTraverser, verFilter, dependency);
Expand Down Expand Up @@ -401,6 +415,8 @@ static class Args {

final CollectRequest request;

final AtomicReference<InterruptedException> interruptedException;

Args(
RepositorySystemSession session,
DataPool pool,
Expand All @@ -416,6 +432,7 @@ static class Args {
this.nodes = nodes;
this.collectionContext = collectionContext;
this.versionContext = versionContext;
this.interruptedException = new AtomicReference<>(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.aether.DefaultRepositorySystemSession;
import org.eclipse.aether.RepositorySystemSession;
Expand Down Expand Up @@ -158,6 +159,25 @@ protected DependencyNode path(DependencyNode root, int... coords) {
}
}

@Test
void testInterruption() throws Exception {
Dependency dependency = newDep("gid:aid:ext:ver", "compile");
CollectRequest request = new CollectRequest(dependency, singletonList(repository));
AtomicReference<Object> cause = new AtomicReference<>(null);
Thread t = new Thread(() -> {
Thread.currentThread().interrupt();
try {
collector.collectDependencies(session, request);
fail("We should throw");
} catch (DependencyCollectionException e) {
cause.set(e.getCause());
}
});
t.start();
t.join();
assertTrue(cause.get() instanceof InterruptedException, String.valueOf(cause.get()));
}

@Test
void testSimpleCollection() throws DependencyCollectionException {
Dependency dependency = newDep("gid:aid:ext:ver", "compile");
Expand Down

0 comments on commit 4801d20

Please # to comment.