From a0a92c9c06e369aaea0a08b6f959b9aca068c096 Mon Sep 17 00:00:00 2001 From: emeroad Date: Wed, 5 Jun 2024 17:04:34 +0900 Subject: [PATCH] [#11050] Replace StopFlag with CompletableFuture.cancel --- .../server/DefaultServerInfoAppender.java | 109 +++++++++++------- .../ApplicationMapBuilderTest.java | 14 +-- 2 files changed, 74 insertions(+), 49 deletions(-) diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java index dabd4723ca4b..dce7530e6a7e 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/appender/server/DefaultServerInfoAppender.java @@ -16,14 +16,15 @@ package com.navercorp.pinpoint.web.applicationmap.appender.server; +import com.navercorp.pinpoint.common.server.util.time.Range; import com.navercorp.pinpoint.common.trace.ServiceType; import com.navercorp.pinpoint.web.applicationmap.nodes.Node; import com.navercorp.pinpoint.web.applicationmap.nodes.NodeList; import com.navercorp.pinpoint.web.applicationmap.nodes.ServerGroupList; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataDuplexMap; -import com.navercorp.pinpoint.common.server.util.time.Range; -import org.apache.logging.log4j.Logger; +import com.navercorp.pinpoint.web.vo.Application; import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.springframework.util.CollectionUtils; import java.time.Instant; @@ -35,7 +36,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; /** @@ -55,70 +55,101 @@ public DefaultServerInfoAppender(ServerGroupListFactory serverGroupListFactory, } @Override - public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, final long timeoutMillis) { + public void appendServerInfo(final Range range, final NodeList source, final LinkDataDuplexMap linkDataDuplexMap, long timeoutMillis) { if (source == null) { return; } + Collection nodes = source.getNodeList(); if (CollectionUtils.isEmpty(nodes)) { return; } - final AtomicBoolean stopSign = new AtomicBoolean(); - final CompletableFuture[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap, stopSign); - if (-1 == timeoutMillis) { - // Returns the result value when complete - CompletableFuture.allOf(futures).join(); - } else { - try { - CompletableFuture.allOf(futures).get(timeoutMillis, TimeUnit.MILLISECONDS); - } catch (Exception e) { // InterruptedException, ExecutionException, TimeoutException - stopSign.set(Boolean.TRUE); - String cause = "an error occurred while adding server info"; - if (e instanceof TimeoutException) { - cause += " build timed out. timeout=" + timeoutMillis + "ms"; - } - throw new RuntimeException(cause, e); - } + + final List serverGroupRequest = getServerGroupListFutures(range, nodes, linkDataDuplexMap); + + timeoutMillis = defaultTimeoutMillis(timeoutMillis); + join(serverGroupRequest, timeoutMillis); + + bind(serverGroupRequest); + } + + private long defaultTimeoutMillis(long timeoutMillis) { + if (timeoutMillis == -1) { + return Long.MAX_VALUE; } + return timeoutMillis; } - private CompletableFuture[] getServerGroupListFutures(Range range, Collection nodes, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) { - List> serverGroupListFutures = new ArrayList<>(); + private List getServerGroupListFutures(Range range, Collection nodes, LinkDataDuplexMap linkDataDuplexMap) { + List serverGroupListFutures = new ArrayList<>(); for (Node node : nodes) { if (node.getServiceType().isUnknown()) { // we do not know the server info for unknown nodes continue; } - CompletableFuture serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap, stopSign); - serverGroupListFutures.add(serverGroupListFuture); + CompletableFuture serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap); + serverGroupListFutures.add(new ServerGroupRequest(node, serverGroupListFuture)); } - return serverGroupListFutures.toArray(new CompletableFuture[0]); + return serverGroupListFutures; } - private CompletableFuture getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) { - CompletableFuture serverGroupListFuture; - ServiceType nodeServiceType = node.getServiceType(); + private record ServerGroupRequest(Node node, CompletableFuture future) { + } + + private CompletableFuture getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap) { + final Application application = node.getApplication(); + final ServiceType nodeServiceType = application.getServiceType(); if (nodeServiceType.isWas()) { - final Instant to = range.getToInstant(); - serverGroupListFuture = CompletableFuture.supplyAsync(new Supplier() { + return CompletableFuture.supplyAsync(new Supplier<>() { @Override public ServerGroupList get() { - if (Boolean.TRUE == stopSign.get()) { // Stop - return serverGroupListFactory.createEmptyNodeInstanceList(); - } + final Instant to = range.getToInstant(); return serverGroupListFactory.createWasNodeInstanceList(node, to); } }, executor); } else if (nodeServiceType.isTerminal() || nodeServiceType.isAlias()) { // extract information about the terminal node - serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap)); + return CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(node, linkDataDuplexMap)); } else if (nodeServiceType.isQueue()) { - serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap)); + return CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(node, linkDataDuplexMap)); } else if (nodeServiceType.isUser()) { - serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList()); - } else { - serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList()); + return CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList()); + } + return CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList()); + } + + + private void join(List serverGroupRequests, long timeoutMillis) { + @SuppressWarnings("rawtypes") + final CompletableFuture[] futures = serverGroupRequests.stream() + .map(ServerGroupRequest::future) + .toArray(CompletableFuture[]::new); + + final CompletableFuture all = CompletableFuture.allOf(futures); + try { + all.get(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (Throwable e) { + all.cancel(false); + String cause = "an error occurred while adding server info"; + if (e instanceof TimeoutException) { + cause += " build timed out. timeout=" + timeoutMillis + "ms"; + } + throw new RuntimeException(cause, e); } - return serverGroupListFuture.thenAccept(node::setServerGroupList); } + + private void bind(List serverGroupRequest) { + for (ServerGroupRequest pair : serverGroupRequest) { + Node node = pair.node(); + CompletableFuture future = pair.future(); + try { + ServerGroupList serverGroupList = future.getNow(null); + node.setServerGroupList(serverGroupList); + } catch (Throwable th) { + logger.warn("Failed to get server info for node {}", node); + throw new RuntimeException("Unexpected error", th); + } + } + } + } diff --git a/web/src/test/java/com/navercorp/pinpoint/web/applicationmap/ApplicationMapBuilderTest.java b/web/src/test/java/com/navercorp/pinpoint/web/applicationmap/ApplicationMapBuilderTest.java index 88ab94920104..3fee26fce7a1 100644 --- a/web/src/test/java/com/navercorp/pinpoint/web/applicationmap/ApplicationMapBuilderTest.java +++ b/web/src/test/java/com/navercorp/pinpoint/web/applicationmap/ApplicationMapBuilderTest.java @@ -79,7 +79,7 @@ public class ApplicationMapBuilderTest { private AgentInfoServerGroupListDataSource agentInfoServerGroupListDataSource; - private long buildTimeoutMillis = 1000; + private final long buildTimeoutMillis = 1000; @BeforeEach public void setUp() { @@ -110,7 +110,7 @@ public List answer(InvocationOnMock invocation) { when(mapResponseDao.selectResponseTime(any(Application.class), any(Range.class))).thenAnswer(responseTimeAnswer); when(responseHistograms.getResponseTimeList(any(Application.class))).thenAnswer(responseTimeAnswer); - when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer>() { + when(agentInfoService.getAgentsByApplicationName(anyString(), anyLong())).thenAnswer(new Answer<>() { @Override public Set answer(InvocationOnMock invocation) throws Throwable { String applicationName = invocation.getArgument(0); @@ -120,7 +120,7 @@ public Set answer(InvocationOnMock invocation) throws Throwable return Set.of(new AgentAndStatus(agentInfo, agentStatus)); } }); - when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer>() { + when(agentInfoService.getAgentsByApplicationNameWithoutStatus(anyString(), anyLong())).thenAnswer(new Answer<>() { @Override public Set answer(InvocationOnMock invocation) throws Throwable { String applicationName = invocation.getArgument(0); @@ -128,7 +128,7 @@ public Set answer(InvocationOnMock invocation) throws Throwable { return Set.of(agentInfo); } }); - when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer() { + when(agentInfoService.getAgentStatus(anyString(), anyLong())).thenAnswer(new Answer<>() { @Override public AgentStatus answer(InvocationOnMock invocation) throws Throwable { String agentId = invocation.getArgument(0); @@ -187,11 +187,9 @@ public void testNoCallData() { .includeServerInfo(serverGroupListFactory) .build(application, buildTimeoutMillis); - assertThat(applicationMap.getNodes()).hasSize(1); assertThat(applicationMap.getNodes()).hasSize(1); assertThat(applicationMap_parallelAppenders.getNodes()).hasSize(1); assertThat(applicationMap.getLinks()).isEmpty(); - assertThat(applicationMap.getLinks()).isEmpty(); assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty(); ApplicationMapVerifier verifier = new ApplicationMapVerifier(applicationMap); @@ -218,11 +216,9 @@ public void testEmptyCallData() { .includeServerInfo(serverGroupListFactory) .build(linkDataDuplexMap, buildTimeoutMillis); - assertThat(applicationMap.getNodes()).isEmpty(); assertThat(applicationMap.getNodes()).isEmpty(); assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty(); - assertThat(applicationMap.getLinks()).isEmpty(); assertThat(applicationMap.getLinks()).isEmpty(); assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty(); @@ -250,11 +246,9 @@ public void testEmptyCallDataSimplfied() { .includeServerInfo(serverGroupListFactory) .build(linkDataDuplexMap, buildTimeoutMillis); - assertThat(applicationMap.getNodes()).isEmpty(); assertThat(applicationMap.getNodes()).isEmpty(); assertThat(applicationMap_parallelAppenders.getNodes()).isEmpty(); - assertThat(applicationMap.getLinks()).isEmpty(); assertThat(applicationMap.getLinks()).isEmpty(); assertThat(applicationMap_parallelAppenders.getLinks()).isEmpty();