diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java index bf344c3357f..ed665c27c28 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleEnvironment.java @@ -17,6 +17,7 @@ package org.apache.celeborn.plugin.flink; +import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup; import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull; import static org.apache.celeborn.plugin.flink.utils.Utils.checkState; import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*; @@ -119,13 +120,13 @@ public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo public ShuffleIOOwnerContext createShuffleIOOwnerContext( String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup) { - MetricGroup nettyGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup)); + MetricGroup remoteGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup)); return new ShuffleIOOwnerContext( checkNotNull(ownerName), checkNotNull(executionAttemptID), parentGroup, - nettyGroup.addGroup(METRIC_GROUP_OUTPUT), - nettyGroup.addGroup(METRIC_GROUP_INPUT)); + remoteGroup.addGroup(METRIC_GROUP_OUTPUT), + remoteGroup.addGroup(METRIC_GROUP_INPUT)); } public Collection getPartitionsOccupyingLocalResources() { diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java index de1d7320080..8927de331b4 100644 --- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleServiceFactory.java @@ -18,7 +18,7 @@ package org.apache.celeborn.plugin.flink; -import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics; +import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.registerShuffleMetrics; import java.time.Duration; diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java new file mode 100644 index 00000000000..5a6da3590f6 --- /dev/null +++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/metric/RemoteShuffleMetricFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.plugin.flink.metric; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; + +/** Factory for remote shuffle service metrics. */ +public class RemoteShuffleMetricFactory { + + // shuffle environment level metrics: Shuffle.Remote.* + + private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments"; + private static final String METRIC_TOTAL_MEMORY = "TotalMemory"; + + private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments"; + private static final String METRIC_AVAILABLE_MEMORY = "AvailableMemory"; + + private static final String METRIC_USED_MEMORY_SEGMENT = "UsedMemorySegments"; + private static final String METRIC_USED_MEMORY = "UsedMemory"; + + // task level metric group structure: Shuffle.Remote..Buffers + + private static final String METRIC_GROUP_SHUFFLE = "Shuffle"; + private static final String METRIC_GROUP_REMOTE = "Remote"; + + private RemoteShuffleMetricFactory() {} + + public static void registerShuffleMetrics( + MetricGroup metricGroup, NetworkBufferPool networkBufferPool) { + checkNotNull(metricGroup); + checkNotNull(networkBufferPool); + + //noinspection deprecation + internalRegisterShuffleMetrics(metricGroup, networkBufferPool); + } + + private static void internalRegisterShuffleMetrics( + MetricGroup parentMetricGroup, NetworkBufferPool networkBufferPool) { + MetricGroup shuffleGroup = parentMetricGroup.addGroup(METRIC_GROUP_SHUFFLE); + MetricGroup networkGroup = shuffleGroup.addGroup(METRIC_GROUP_REMOTE); + + networkGroup.gauge( + METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments); + networkGroup.gauge(METRIC_TOTAL_MEMORY, networkBufferPool::getTotalMemory); + + networkGroup.gauge( + METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments); + networkGroup.gauge(METRIC_AVAILABLE_MEMORY, networkBufferPool::getAvailableMemory); + + networkGroup.gauge( + METRIC_USED_MEMORY_SEGMENT, networkBufferPool::getNumberOfUsedMemorySegments); + networkGroup.gauge(METRIC_USED_MEMORY, networkBufferPool::getUsedMemory); + } + + public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGroup) { + return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE); + } +}