|
46 | 46 | import org.apache.hadoop.classification.VisibleForTesting;
|
47 | 47 | import org.apache.hadoop.classification.InterfaceAudience;
|
48 | 48 | import org.apache.hadoop.fs.StorageType;
|
| 49 | +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; |
49 | 50 | import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
50 | 51 | import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
|
51 | 52 | import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
@@ -528,9 +529,8 @@ boolean doWaitForRestart() {
|
528 | 529 | // are congested
|
529 | 530 | private final List<DatanodeInfo> congestedNodes = new ArrayList<>();
|
530 | 531 | private final Map<DatanodeInfo, Integer> slowNodeMap = new HashMap<>();
|
531 |
| - private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; |
532 |
| - private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = |
533 |
| - CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; |
| 532 | + private int congestionBackOffMeanTimeInMs; |
| 533 | + private int congestionBackOffMaxTimeInMs; |
534 | 534 | private int lastCongestionBackoffTime;
|
535 | 535 | private int maxPipelineRecoveryRetries;
|
536 | 536 | private int markSlowNodeAsBadNodeThreshold;
|
@@ -564,6 +564,35 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
|
564 | 564 | this.addBlockFlags = flags;
|
565 | 565 | this.maxPipelineRecoveryRetries = conf.getMaxPipelineRecoveryRetries();
|
566 | 566 | this.markSlowNodeAsBadNodeThreshold = conf.getMarkSlowNodeAsBadNodeThreshold();
|
| 567 | + congestionBackOffMeanTimeInMs = dfsClient.getConfiguration().getInt( |
| 568 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME, |
| 569 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT); |
| 570 | + congestionBackOffMaxTimeInMs = dfsClient.getConfiguration().getInt( |
| 571 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, |
| 572 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT); |
| 573 | + if (congestionBackOffMeanTimeInMs <= 0) { |
| 574 | + LOG.warn("Configuration: {} is not appropriate, using default value: {}", |
| 575 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME, |
| 576 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT); |
| 577 | + } |
| 578 | + if (congestionBackOffMaxTimeInMs <= 0) { |
| 579 | + LOG.warn("Configuration: {} is not appropriate, using default value: {}", |
| 580 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, |
| 581 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT); |
| 582 | + } |
| 583 | + if (congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) { |
| 584 | + LOG.warn("Configuration: {} can not less than {}, using their default values.", |
| 585 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME, |
| 586 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME); |
| 587 | + } |
| 588 | + if (congestionBackOffMeanTimeInMs <= 0 || congestionBackOffMaxTimeInMs <= 0 || |
| 589 | + congestionBackOffMaxTimeInMs < congestionBackOffMeanTimeInMs) { |
| 590 | + congestionBackOffMeanTimeInMs = |
| 591 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT; |
| 592 | + congestionBackOffMaxTimeInMs = |
| 593 | + HdfsClientConfigKeys.DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT; |
| 594 | + } |
| 595 | + |
567 | 596 | }
|
568 | 597 |
|
569 | 598 | /**
|
@@ -1998,10 +2027,10 @@ private void backOffIfNecessary() throws InterruptedException {
|
1998 | 2027 | sb.append(' ').append(i);
|
1999 | 2028 | }
|
2000 | 2029 | int range = Math.abs(lastCongestionBackoffTime * 3 -
|
2001 |
| - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); |
| 2030 | + congestionBackOffMeanTimeInMs); |
2002 | 2031 | int base = Math.min(lastCongestionBackoffTime * 3,
|
2003 |
| - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); |
2004 |
| - t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS, |
| 2032 | + congestionBackOffMeanTimeInMs); |
| 2033 | + t = Math.min(congestionBackOffMaxTimeInMs, |
2005 | 2034 | (int)(base + Math.random() * range));
|
2006 | 2035 | lastCongestionBackoffTime = t;
|
2007 | 2036 | sb.append(" are congested. Backing off for ").append(t).append(" ms");
|
|
0 commit comments