Skip to content

Commit

Permalink
[#11050] Replace StopFlag with CompletableFuture.cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 5, 2024
1 parent eac901b commit f3f1b76
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.navercorp.pinpoint.web.applicationmap.appender.server;

import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.web.applicationmap.nodes.Node;
import com.navercorp.pinpoint.web.applicationmap.nodes.NodeList;
import com.navercorp.pinpoint.web.applicationmap.nodes.ServerGroupList;
import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataDuplexMap;
import com.navercorp.pinpoint.common.server.util.time.Range;
import org.apache.logging.log4j.Logger;
import com.navercorp.pinpoint.web.vo.Application;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.util.CollectionUtils;

import java.time.Instant;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand All @@ -55,70 +55,101 @@ public DefaultServerInfoAppender(ServerGroupListFactory serverGroupListFactory,
}

@Override
public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, final long timeoutMillis) {
public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, long timeoutMillis) {
if (source == null) {
return;
}

Collection<Node> nodes = source.getNodeList();
if (CollectionUtils.isEmpty(nodes)) {
return;
}
final AtomicBoolean stopSign = new AtomicBoolean();
final CompletableFuture[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap, stopSign);
if (-1 == timeoutMillis) {
// Returns the result value when complete
CompletableFuture.allOf(futures).join();
} else {
try {
CompletableFuture.allOf(futures).get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) { // InterruptedException, ExecutionException, TimeoutException
stopSign.set(Boolean.TRUE);
String cause = "an error occurred while adding server info";
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";
}
throw new RuntimeException(cause, e);
}

final List<ServerGroupRequest> serverGroupRequest = getServerGroupListFutures(range, nodes, linkDataDuplexMap);

timeoutMillis = defaultTimeoutMillis(timeoutMillis);
join(serverGroupRequest, timeoutMillis);

bind(serverGroupRequest);
}

private long defaultTimeoutMillis(long timeoutMillis) {
if (timeoutMillis == -1) {
return Long.MAX_VALUE;
}
return timeoutMillis;
}

private CompletableFuture[] getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
List<CompletableFuture<Void>> serverGroupListFutures = new ArrayList<>();
private List<ServerGroupRequest> getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap) {
List<ServerGroupRequest> serverGroupListFutures = new ArrayList<>();
for (Node node : nodes) {
if (node.getServiceType().isUnknown()) {
// we do not know the server info for unknown nodes
continue;
}
CompletableFuture<Void> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap, stopSign);
serverGroupListFutures.add(serverGroupListFuture);
CompletableFuture<ServerGroupList> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap);
serverGroupListFutures.add(new ServerGroupRequest(node, serverGroupListFuture));
}
return serverGroupListFutures.toArray(new CompletableFuture[0]);
return serverGroupListFutures;
}

private CompletableFuture<Void> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
CompletableFuture<ServerGroupList> serverGroupListFuture;
ServiceType nodeServiceType = node.getServiceType();
private record ServerGroupRequest(Node node, CompletableFuture<ServerGroupList> future) {
}

private CompletableFuture<ServerGroupList> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap) {
final Application application = node.getApplication();
final ServiceType nodeServiceType = application.getServiceType();
if (nodeServiceType.isWas()) {
final Instant to = range.getToInstant();
serverGroupListFuture = CompletableFuture.supplyAsync(new Supplier<ServerGroupList>() {
return CompletableFuture.supplyAsync(new Supplier<>() {
@Override
public ServerGroupList get() {
if (Boolean.TRUE == stopSign.get()) { // Stop
return serverGroupListFactory.createEmptyNodeInstanceList();
}
final Instant to = range.getToInstant();
return serverGroupListFactory.createWasNodeInstanceList(node, to);
}
}, executor);
} else if (nodeServiceType.isTerminal() || nodeServiceType.isAlias()) {
// extract information about the terminal node
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap));
} else if (nodeServiceType.isQueue()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap));
} else if (nodeServiceType.isUser()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
} else {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
return CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
}
return CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
}


private void join(List<ServerGroupRequest> serverGroupRequests, long timeoutMillis) {
@SuppressWarnings("rawtypes")
final CompletableFuture[] futures = serverGroupRequests.stream()
.map(ServerGroupRequest::future)
.toArray(CompletableFuture[]::new);

final CompletableFuture<Void> all = CompletableFuture.allOf(futures);
try {
all.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
all.cancel(false);
String cause = "an error occurred while adding server info";
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";
}
throw new RuntimeException(cause, e);
}
return serverGroupListFuture.thenAccept(node::setServerGroupList);
}

private void bind(List<ServerGroupRequest> serverGroupRequest) {
for (ServerGroupRequest pair : serverGroupRequest) {
Node node = pair.node();
CompletableFuture<ServerGroupList> future = pair.future();
try {
ServerGroupList serverGroupList = future.getNow(null);
node.setServerGroupList(serverGroupList);
} catch (Throwable th) {
logger.warn("Failed to get server info for node {}", node);
throw new RuntimeException("Unexpected error", th);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class ApplicationMapBuilderTest {

private AgentInfoServerGroupListDataSource agentInfoServerGroupListDataSource;

private long buildTimeoutMillis = 1000;
private final long buildTimeoutMillis = 1000;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -110,7 +110,7 @@ public List<ResponseTime> answer(InvocationOnMock invocation) {
when(mapResponseDao.selectResponseTime(any(Application.class), any(Range.class))).thenAnswer(responseTimeAnswer);
when(responseHistograms.getResponseTimeList(any(Application.class))).thenAnswer(responseTimeAnswer);

when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<Set<AgentAndStatus>>() {
when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public Set<AgentAndStatus> answer(InvocationOnMock invocation) throws Throwable {
String applicationName = invocation.getArgument(0);
Expand All @@ -120,15 +120,15 @@ public Set<AgentAndStatus> answer(InvocationOnMock invocation) throws Throwable
return Set.of(new AgentAndStatus(agentInfo, agentStatus));
}
});
when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<Set<AgentInfo>>() {
when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public Set<AgentInfo> answer(InvocationOnMock invocation) throws Throwable {
String applicationName = invocation.getArgument(0);
AgentInfo agentInfo = ApplicationMapBuilderTestHelper.createAgentInfoFromApplicationName(applicationName);
return Set.of(agentInfo);
}
});
when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<AgentStatus>() {
when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public AgentStatus answer(InvocationOnMock invocation) throws Throwable {
String agentId = invocation.getArgument(0);
Expand Down Expand Up @@ -187,11 +187,9 @@ public void testNoCallData() {
.includeServerInfo(serverGroupListFactory)
.build(application, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).hasSize(1);
assertThat(applicationMap.getNodes()).hasSize(1);
assertThat(applicationMap_parallelAppenders.getNodes()).hasSize(1);
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

ApplicationMapVerifier verifier = new ApplicationMapVerifier(applicationMap);
Expand All @@ -218,11 +216,9 @@ public void testEmptyCallData() {
.includeServerInfo(serverGroupListFactory)
.build(linkDataDuplexMap, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty();

assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

Expand Down Expand Up @@ -250,11 +246,9 @@ public void testEmptyCallDataSimplfied() {
.includeServerInfo(serverGroupListFactory)
.build(linkDataDuplexMap, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty();

assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

Expand Down

0 comments on commit f3f1b76

Please # to comment.