Skip to content

Commit

Permalink
[CELEBORN-1804] Shuffle environment metrics of RemoteShuffleEnvironme…
Browse files Browse the repository at this point in the history
…nt should use Shuffle.Remote metric group
  • Loading branch information
SteNicholas committed Dec 26, 2024
1 parent fde6365 commit 5029052
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.celeborn.plugin.flink;

import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.createShuffleIOOwnerMetricGroup;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkNotNull;
import static org.apache.celeborn.plugin.flink.utils.Utils.checkState;
import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.*;
Expand Down Expand Up @@ -119,13 +120,13 @@ public boolean updatePartitionInfo(ExecutionAttemptID consumerID, PartitionInfo

public ShuffleIOOwnerContext createShuffleIOOwnerContext(
String ownerName, ExecutionAttemptID executionAttemptID, MetricGroup parentGroup) {
MetricGroup nettyGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
MetricGroup remoteGroup = createShuffleIOOwnerMetricGroup(checkNotNull(parentGroup));
return new ShuffleIOOwnerContext(
checkNotNull(ownerName),
checkNotNull(executionAttemptID),
parentGroup,
nettyGroup.addGroup(METRIC_GROUP_OUTPUT),
nettyGroup.addGroup(METRIC_GROUP_INPUT));
remoteGroup.addGroup(METRIC_GROUP_OUTPUT),
remoteGroup.addGroup(METRIC_GROUP_INPUT));
}

public Collection<ResultPartitionID> getPartitionsOccupyingLocalResources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.celeborn.plugin.flink;

import static org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory.registerShuffleMetrics;
import static org.apache.celeborn.plugin.flink.metric.RemoteShuffleMetricFactory.registerShuffleMetrics;

import java.time.Duration;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.plugin.flink.metric;

import static org.apache.flink.util.Preconditions.checkNotNull;

import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;

/** Factory for remote shuffle service metrics. */
public class RemoteShuffleMetricFactory {

// shuffle environment level metrics: Shuffle.Remote.*

private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments";
private static final String METRIC_TOTAL_MEMORY = "TotalMemory";

private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegments";
private static final String METRIC_AVAILABLE_MEMORY = "AvailableMemory";

private static final String METRIC_USED_MEMORY_SEGMENT = "UsedMemorySegments";
private static final String METRIC_USED_MEMORY = "UsedMemory";

// task level metric group structure: Shuffle.Remote.<Input|Output>.Buffers

private static final String METRIC_GROUP_SHUFFLE = "Shuffle";
private static final String METRIC_GROUP_REMOTE = "Remote";

private RemoteShuffleMetricFactory() {}

public static void registerShuffleMetrics(
MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
checkNotNull(metricGroup);
checkNotNull(networkBufferPool);

//noinspection deprecation
internalRegisterShuffleMetrics(metricGroup, networkBufferPool);
}

private static void internalRegisterShuffleMetrics(
MetricGroup parentMetricGroup, NetworkBufferPool networkBufferPool) {
MetricGroup shuffleGroup = parentMetricGroup.addGroup(METRIC_GROUP_SHUFFLE);
MetricGroup networkGroup = shuffleGroup.addGroup(METRIC_GROUP_REMOTE);

networkGroup.gauge(
METRIC_TOTAL_MEMORY_SEGMENT, networkBufferPool::getTotalNumberOfMemorySegments);
networkGroup.gauge(METRIC_TOTAL_MEMORY, networkBufferPool::getTotalMemory);

networkGroup.gauge(
METRIC_AVAILABLE_MEMORY_SEGMENT, networkBufferPool::getNumberOfAvailableMemorySegments);
networkGroup.gauge(METRIC_AVAILABLE_MEMORY, networkBufferPool::getAvailableMemory);

networkGroup.gauge(
METRIC_USED_MEMORY_SEGMENT, networkBufferPool::getNumberOfUsedMemorySegments);
networkGroup.gauge(METRIC_USED_MEMORY, networkBufferPool::getUsedMemory);
}

public static MetricGroup createShuffleIOOwnerMetricGroup(MetricGroup parentGroup) {
return parentGroup.addGroup(METRIC_GROUP_SHUFFLE).addGroup(METRIC_GROUP_REMOTE);
}
}

0 comments on commit 5029052

Please # to comment.