Skip to content

HDFS-17242. Make congestion backoff time configurable. #6227

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
Expand Down Expand Up @@ -528,9 +529,8 @@ boolean doWaitForRestart() {
// are congested
private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
private final Map<DatanodeInfo, Integer> slowNodeMap = new HashMap<>();
private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000;
private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS =
CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10;
private int congestionBackOffMeanTimeInMs;
private int congestionBackOffMaxTimeInMs;
private int lastCongestionBackoffTime;
private int maxPipelineRecoveryRetries;
private int markSlowNodeAsBadNodeThreshold;
Expand Down Expand Up @@ -564,6 +564,35 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
this.addBlockFlags = flags;
this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold();
congestionBackOffMeanTimeInMs = dfsClient.getConfiguration().getInt(
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT);
congestionBackOffMaxTimeInMs = dfsClient.getConfiguration().getInt(
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT);
if (congestionBackOffMeanTimeInMs <= 0) {
LOG.warn("Configuration: {} is not appropriate, using default value: {}",
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT);
}
if (congestionBackOffMaxTimeInMs <= 0) {
LOG.warn("Configuration: {} is not appropriate, using default value: {}",
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT);
}
if (congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) {
LOG.warn("Configuration: {} can not less than {}, using their default values.",
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME,
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME);
}
if (congestionBackOffMeanTimeInMs <= 0 || congestionBackOffMaxTimeInMs <= 0 ||
congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) {
Comment on lines +588 to +589
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to have a redundant wrapping if condition, we can directly go to if statements & change to defaults post checking the condition in the same if block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, sir. I can not get you here. Do you mean remove if (congestionBackOffMeanTimeInMs <= 0) and LOG.warn right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean no need to have parent if check

 if (congestionBackOffMeanTimeInMs <= 0 || congestionBackOffMaxTimeInMs <= 0 ||
        congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) {

We can have all three individual if checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Have fixed it. Thanks a lot Sir.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks a bit weird to me. Isn't the following cleaner?

if (meanTime <= 0) {
  log.WARN("mean time invalid. use default");
  meanTime = meanTimeDefault;
}

if (maxTime <= 0) {
  log.WARN("max time invalid. use default");
  maxTime = maxTimeDefault;
}

if (meanTime > maxTime) {
  log.WARN("mean/max time invalid. use default for both");
  meanTime = meanTimeDefault;
  maxTime = maxTimeDefault;
}

congestionBackOffMeanTimeInMs =
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT;
congestionBackOffMaxTimeInMs =
HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT;
}

}

/**
Expand Down Expand Up @@ -1998,10 +2027,10 @@ private void backOffIfNecessary() throws InterruptedException {
sb.append(' ').append(i);
}
int range = Math.abs(lastCongestionBackoffTime * 3 -
CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
congestionBackOffMeanTimeInMs);
int base = Math.min(lastCongestionBackoffTime * 3,
CONGESTION_BACKOFF_MEAN_TIME_IN_MS);
t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS,
congestionBackOffMeanTimeInMs);
t = Math.min(congestionBackOffMaxTimeInMs,
(int)(base + Math.random() * range));
lastCongestionBackoffTime = t;
sb.append(" are congested. Backing off for ").append(t).append(" ms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,15 @@ public interface HdfsClientConfigKeys {
"dfs.client.output.stream.uniq.default.key";
String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

String DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME =
"dfs.client.congestion.backoff.mean.time";
int DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT = 5000;

String DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME =
"dfs.client.congestion.backoff.max.time";
int DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT =
DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT * 10;

/**
* These are deprecated config keys to client code.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6559,6 +6559,22 @@
If the namespace is DEFAULT, it's best to change this conf to other value.
</description>
</property>
<property>
<name>dfs.client.congestion.backoff.mean.time</name>
<value>5000</value>
<description>
The mean time in milliseconds which is used to compute
client congestion backoff sleep time.
</description>
</property>
<property>
<name>dfs.client.congestion.backoff.max.time</name>
<value>50000</value>
<description>
The max time in milliseconds which is used to restrict
the upper limit backoff sleep time for client.
</description>
</property>
<property>
<name>dfs.client.rbf.observer.read.enable</name>
<value>false</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ private void runAdjustChunkBoundary(
public void testCongestionBackoff() throws IOException {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
Configuration conf = mock(Configuration.class);
when(client.getConfiguration()).thenReturn(conf);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
Expand Down Expand Up @@ -306,6 +308,8 @@ public void testCongestionBackoff() throws IOException {
public void testCongestionAckDelay() {
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
DFSClient client = mock(DFSClient.class);
Configuration conf = mock(Configuration.class);
when(client.getConfiguration()).thenReturn(conf);
when(client.getConf()).thenReturn(dfsClientConf);
when(client.getTracer()).thenReturn(FsTracer.get(new Configuration()));
client.clientRunning = true;
Expand All @@ -325,7 +329,7 @@ public void testCongestionAckDelay() {
ArrayList<DatanodeInfo> congestedNodes = (ArrayList<DatanodeInfo>)
Whitebox.getInternalState(stream, "congestedNodes");
int backOffMaxTime = (int)
Whitebox.getInternalState(stream, "CONGESTION_BACK_OFF_MAX_TIME_IN_MS");
Whitebox.getInternalState(stream, "congestionBackOffMaxTimeInMs");
DFSPacket[] packet = new DFSPacket[100];
AtomicBoolean isDelay = new AtomicBoolean(true);

Expand Down