Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[#11050] Replace StopFlag with CompletableFuture.cancel #11125

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package com.navercorp.pinpoint.web.applicationmap.appender.server;

import com.navercorp.pinpoint.common.server.util.time.Range;
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.web.applicationmap.nodes.Node;
import com.navercorp.pinpoint.web.applicationmap.nodes.NodeList;
import com.navercorp.pinpoint.web.applicationmap.nodes.ServerGroupList;
import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataDuplexMap;
import com.navercorp.pinpoint.common.server.util.time.Range;
import org.apache.logging.log4j.Logger;
import com.navercorp.pinpoint.web.vo.Application;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.util.CollectionUtils;

import java.time.Instant;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand All @@ -55,70 +55,101 @@ public DefaultServerInfoAppender(ServerGroupListFactory serverGroupListFactory,
}

@Override
public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, final long timeoutMillis) {
public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, long timeoutMillis) {
if (source == null) {
return;
}

Collection<Node> nodes = source.getNodeList();
if (CollectionUtils.isEmpty(nodes)) {
return;
}
final AtomicBoolean stopSign = new AtomicBoolean();
final CompletableFuture[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap, stopSign);
if (-1 == timeoutMillis) {
// Returns the result value when complete
CompletableFuture.allOf(futures).join();
} else {
try {
CompletableFuture.allOf(futures).get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) { // InterruptedException, ExecutionException, TimeoutException
stopSign.set(Boolean.TRUE);
String cause = "an error occurred while adding server info";
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";
}
throw new RuntimeException(cause, e);
}

final List<ServerGroupRequest> serverGroupRequest = getServerGroupListFutures(range, nodes, linkDataDuplexMap);

timeoutMillis = defaultTimeoutMillis(timeoutMillis);
join(serverGroupRequest, timeoutMillis);

bind(serverGroupRequest);
}

private long defaultTimeoutMillis(long timeoutMillis) {
if (timeoutMillis == -1) {
return Long.MAX_VALUE;
}
return timeoutMillis;
}

private CompletableFuture[] getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
List<CompletableFuture<Void>> serverGroupListFutures = new ArrayList<>();
private List<ServerGroupRequest> getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap) {
List<ServerGroupRequest> serverGroupListFutures = new ArrayList<>();
for (Node node : nodes) {
if (node.getServiceType().isUnknown()) {
// we do not know the server info for unknown nodes
continue;
}
CompletableFuture<Void> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap, stopSign);
serverGroupListFutures.add(serverGroupListFuture);
CompletableFuture<ServerGroupList> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap);
serverGroupListFutures.add(new ServerGroupRequest(node, serverGroupListFuture));
}
return serverGroupListFutures.toArray(new CompletableFuture[0]);
return serverGroupListFutures;
}

private CompletableFuture<Void> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
CompletableFuture<ServerGroupList> serverGroupListFuture;
ServiceType nodeServiceType = node.getServiceType();
private record ServerGroupRequest(Node node, CompletableFuture<ServerGroupList> future) {
}

private CompletableFuture<ServerGroupList> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap) {
final Application application = node.getApplication();
final ServiceType nodeServiceType = application.getServiceType();
if (nodeServiceType.isWas()) {
final Instant to = range.getToInstant();
serverGroupListFuture = CompletableFuture.supplyAsync(new Supplier<ServerGroupList>() {
return CompletableFuture.supplyAsync(new Supplier<>() {
@Override
public ServerGroupList get() {
if (Boolean.TRUE == stopSign.get()) { // Stop
return serverGroupListFactory.createEmptyNodeInstanceList();
}
final Instant to = range.getToInstant();
return serverGroupListFactory.createWasNodeInstanceList(node, to);
}
}, executor);
} else if (nodeServiceType.isTerminal() || nodeServiceType.isAlias()) {
// extract information about the terminal node
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap));
} else if (nodeServiceType.isQueue()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap));
} else if (nodeServiceType.isUser()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
} else {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
return CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
}
return CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
}


private void join(List<ServerGroupRequest> serverGroupRequests, long timeoutMillis) {
@SuppressWarnings("rawtypes")
final CompletableFuture[] futures = serverGroupRequests.stream()
.map(ServerGroupRequest::future)
.toArray(CompletableFuture[]::new);

final CompletableFuture<Void> all = CompletableFuture.allOf(futures);
try {
all.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Throwable e) {
all.cancel(false);
String cause = "an error occurred while adding server info";
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";
}
throw new RuntimeException(cause, e);
}
return serverGroupListFuture.thenAccept(node::setServerGroupList);
}

private void bind(List<ServerGroupRequest> serverGroupRequest) {
for (ServerGroupRequest pair : serverGroupRequest) {
Node node = pair.node();
CompletableFuture<ServerGroupList> future = pair.future();
try {
ServerGroupList serverGroupList = future.getNow(null);
node.setServerGroupList(serverGroupList);
} catch (Throwable th) {
logger.warn("Failed to get server info for node {}", node);
throw new RuntimeException("Unexpected error", th);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class ApplicationMapBuilderTest {

private AgentInfoServerGroupListDataSource agentInfoServerGroupListDataSource;

private long buildTimeoutMillis = 1000;
private final long buildTimeoutMillis = 1000;

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -110,7 +110,7 @@ public List<ResponseTime> answer(InvocationOnMock invocation) {
when(mapResponseDao.selectResponseTime(any(Application.class), any(Range.class))).thenAnswer(responseTimeAnswer);
when(responseHistograms.getResponseTimeList(any(Application.class))).thenAnswer(responseTimeAnswer);

when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<Set<AgentAndStatus>>() {
when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public Set<AgentAndStatus> answer(InvocationOnMock invocation) throws Throwable {
String applicationName = invocation.getArgument(0);
Expand All @@ -120,15 +120,15 @@ public Set<AgentAndStatus> answer(InvocationOnMock invocation) throws Throwable
return Set.of(new AgentAndStatus(agentInfo, agentStatus));
}
});
when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<Set<AgentInfo>>() {
when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public Set<AgentInfo> answer(InvocationOnMock invocation) throws Throwable {
String applicationName = invocation.getArgument(0);
AgentInfo agentInfo = ApplicationMapBuilderTestHelper.createAgentInfoFromApplicationName(applicationName);
return Set.of(agentInfo);
}
});
when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<AgentStatus>() {
when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<>() {
@Override
public AgentStatus answer(InvocationOnMock invocation) throws Throwable {
String agentId = invocation.getArgument(0);
Expand Down Expand Up @@ -187,11 +187,9 @@ public void testNoCallData() {
.includeServerInfo(serverGroupListFactory)
.build(application, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).hasSize(1);
assertThat(applicationMap.getNodes()).hasSize(1);
assertThat(applicationMap_parallelAppenders.getNodes()).hasSize(1);
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

ApplicationMapVerifier verifier = new ApplicationMapVerifier(applicationMap);
Expand All @@ -218,11 +216,9 @@ public void testEmptyCallData() {
.includeServerInfo(serverGroupListFactory)
.build(linkDataDuplexMap, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty();

assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

Expand Down Expand Up @@ -250,11 +246,9 @@ public void testEmptyCallDataSimplfied() {
.includeServerInfo(serverGroupListFactory)
.build(linkDataDuplexMap, buildTimeoutMillis);

assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap.getNodes()).isEmpty();
assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty();

assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap.getLinks()).isEmpty();
assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();

Expand Down