Skip to content

Commit

Permalink
[#11050] Replace StopFlag with CompletableFuture.cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 5, 2024
1 parent eac901b commit 9e066c0
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.navercorp.pinpoint.web.applicationmap.appender.server;

import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.web.applicationmap.nodes.Node;
import com.navercorp.pinpoint.web.applicationmap.nodes.NodeList;
import com.navercorp.pinpoint.web.applicationmap.nodes.ServerGroupList;
import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataDuplexMap;
import com.navercorp.pinpoint.common.server.util.time.Range;
import org.apache.logging.log4j.Logger;
import com.navercorp.pinpoint.web.vo.Application;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.util.CollectionUtils;

import java.time.Instant;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand All @@ -55,70 +55,96 @@ public DefaultServerInfoAppender(ServerGroupListFactory serverGroupListFactory,
}

@Override
public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, final long timeoutMillis) {
public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, long timeoutMillis) {
if (source == null) {
return;
}

Collection<Node> nodes = source.getNodeList();
if (CollectionUtils.isEmpty(nodes)) {
return;
}
final AtomicBoolean stopSign = new AtomicBoolean();
final CompletableFuture[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap, stopSign);
if (-1 == timeoutMillis) {
// Returns the result value when complete
CompletableFuture.allOf(futures).join();
} else {
try {
CompletableFuture.allOf(futures).get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) { // InterruptedException, ExecutionException, TimeoutException
stopSign.set(Boolean.TRUE);
String cause = "an error occurred while adding server info";
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";
}
throw new RuntimeException(cause, e);

final CompletableFuture<ServerGroupList>[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap);

timeoutMillis = defaultTimeoutMillis(timeoutMillis);
join(futures, timeoutMillis);

int index = 0;
for (Node node : nodes) {
if (skipServerInfo(node)) {
continue;
}
final CompletableFuture<ServerGroupList> future = futures[index++];
if (future.isCompletedExceptionally()) {
logger.warn("Failed to get server info for node {}", node);
node.setServerGroupList(serverGroupListFactory.createEmptyNodeInstanceList());

Check warning on line 81 in web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java#L80-L81

Added lines #L80 - L81 were not covered by tests
} else {
ServerGroupList serverGroupList = future.getNow(null);
logger.trace("ServerGroupList: {}", serverGroupList);
node.setServerGroupList(serverGroupList);
}
}
}

private long defaultTimeoutMillis(long timeoutMillis) {
if (timeoutMillis == -1) {
return Long.MAX_VALUE;
}
return timeoutMillis;
}

private void join(CompletableFuture<ServerGroupList>[] futures, long timeoutMillis) {
final CompletableFuture<Void> all = CompletableFuture.allOf(futures);
try {
all.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
all.cancel(false);
String cause = "an error occurred while adding server info";

Check warning on line 103 in web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java#L101-L103

Added lines #L101 - L103 were not covered by tests
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";

Check warning on line 105 in web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java#L105

Added line #L105 was not covered by tests
}
throw new RuntimeException(cause, e);

Check warning on line 107 in web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java#L107

Added line #L107 was not covered by tests
}
}

private CompletableFuture[] getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
List<CompletableFuture<Void>> serverGroupListFutures = new ArrayList<>();
@SuppressWarnings("unchecked")
private CompletableFuture<ServerGroupList>[] getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap) {
List<CompletableFuture<ServerGroupList>> serverGroupListFutures = new ArrayList<>();
for (Node node : nodes) {
if (node.getServiceType().isUnknown()) {
// we do not know the server info for unknown nodes
if (skipServerInfo(node)) {
continue;
}
CompletableFuture<Void> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap, stopSign);
CompletableFuture<ServerGroupList> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap);
serverGroupListFutures.add(serverGroupListFuture);
}
return serverGroupListFutures.toArray(new CompletableFuture[0]);
}

private CompletableFuture<Void> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
CompletableFuture<ServerGroupList> serverGroupListFuture;
ServiceType nodeServiceType = node.getServiceType();
private boolean skipServerInfo(Node node) {
// we do not know the server info for unknown nodes
return node.getServiceType().isUnknown();
}

private CompletableFuture<ServerGroupList> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap) {
final Application application = node.getApplication();
final ServiceType nodeServiceType = application.getServiceType();
if (nodeServiceType.isWas()) {
final Instant to = range.getToInstant();
serverGroupListFuture = CompletableFuture.supplyAsync(new Supplier<ServerGroupList>() {
return CompletableFuture.supplyAsync(new Supplier<>() {
@Override
public ServerGroupList get() {
if (Boolean.TRUE == stopSign.get()) { // Stop
return serverGroupListFactory.createEmptyNodeInstanceList();
}
final Instant to = range.getToInstant();
return serverGroupListFactory.createWasNodeInstanceList(node, to);
}
}, executor);
} else if (nodeServiceType.isTerminal() || nodeServiceType.isAlias()) {
// extract information about the terminal node
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap));
} else if (nodeServiceType.isQueue()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap));

Check warning on line 144 in web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java#L144

Added line #L144 was not covered by tests
} else if (nodeServiceType.isUser()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
} else {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
return CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
}
return serverGroupListFuture.thenAccept(node::setServerGroupList);
return CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());

Check warning on line 148 in web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java

View check run for this annotation

Codecov / codecov/patch

web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java#L148

Added line #L148 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class ApplicationMapBuilderTest {

private AgentInfoServerGroupListDataSource agentInfoServerGroupListDataSource;

private long buildTimeoutMillis = 1000;
private final long buildTimeoutMillis = 1000;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -110,7 +110,7 @@ public List<ResponseTime> answer(InvocationOnMock invocation) {
when(mapResponseDao.selectResponseTime(any(Application.class), any(Range.class))).thenAnswer(responseTimeAnswer);
when(responseHistograms.getResponseTimeList(any(Application.class))).thenAnswer(responseTimeAnswer);

when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<Set<AgentAndStatus>>() {
when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public Set<AgentAndStatus> answer(InvocationOnMock invocation) throws Throwable {
String applicationName = invocation.getArgument(0);
Expand All @@ -120,15 +120,15 @@ public Set<AgentAndStatus> answer(InvocationOnMock invocation) throws Throwable
return Set.of(new AgentAndStatus(agentInfo, agentStatus));
}
});
when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<Set<AgentInfo>>() {
when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public Set<AgentInfo> answer(InvocationOnMock invocation) throws Throwable {
String applicationName = invocation.getArgument(0);
AgentInfo agentInfo = ApplicationMapBuilderTestHelper.createAgentInfoFromApplicationName(applicationName);
return Set.of(agentInfo);
}
});
when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<AgentStatus>() {
when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public AgentStatus answer(InvocationOnMock invocation) throws Throwable {
String agentId = invocation.getArgument(0);
Expand Down Expand Up @@ -187,11 +187,9 @@ public void testNoCallData() {
.includeServerInfo(serverGroupListFactory)
.build(application, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).hasSize(1);
assertThat(applicationMap.getNodes()).hasSize(1);
assertThat(applicationMap_parallelAppenders.getNodes()).hasSize(1);
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

ApplicationMapVerifier verifier = new ApplicationMapVerifier(applicationMap);
Expand All @@ -218,11 +216,9 @@ public void testEmptyCallData() {
.includeServerInfo(serverGroupListFactory)
.build(linkDataDuplexMap, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty();

assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

Expand Down Expand Up @@ -250,11 +246,9 @@ public void testEmptyCallDataSimplfied() {
.includeServerInfo(serverGroupListFactory)
.build(linkDataDuplexMap, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty();

assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

Expand Down

0 comments on commit 9e066c0

Please # to comment.