Skip to content

Commit

Permalink
HDFS-17732. addExpectedReplicasToPending should add EC block expected…
Browse files Browse the repository at this point in the history
…Storages in PendingReconstructionBlocks
  • Loading branch information
LiuGuH committed Feb 20, 2025
1 parent 5067082 commit b21d58f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1252,30 +1252,21 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc,
/**
* If IBR is not sent from expected locations yet, add the datanodes to
* pendingReconstruction in order to keep RedundancyMonitor from scheduling
* the block. In case of erasure coding blocks, adds only in case there
* isn't any missing node.
* the block.
*/
public void addExpectedReplicasToPending(BlockInfo blk) {
boolean addForStriped = false;
DatanodeStorageInfo[] expectedStorages =
blk.getUnderConstructionFeature().getExpectedStorageLocations();
if (blk.isStriped()) {
BlockInfoStriped blkStriped = (BlockInfoStriped) blk;
addForStriped =
blkStriped.getRealTotalBlockNum() == expectedStorages.length;
}
if (!blk.isStriped() || addForStriped) {
if (expectedStorages.length - blk.numNodes() > 0) {
ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
for (DatanodeStorageInfo storage : expectedStorages) {
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
if (blk.findStorageInfo(dnd) == null) {
pendingNodes.add(storage);
}
if (expectedStorages.length - blk.numNodes() > 0) {
ArrayList<DatanodeStorageInfo> pendingNodes = new ArrayList<>();
for (DatanodeStorageInfo storage : expectedStorages) {
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
if (blk.findStorageInfo(dnd) == null) {
pendingNodes.add(storage);
}
pendingReconstruction.increment(blk,
pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()]));
}
pendingReconstruction.increment(blk,
pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()]));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -107,8 +109,12 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
Expand Down Expand Up @@ -2780,4 +2786,64 @@ public void testAllRackFailureDuringPipelineSetup() throws Exception {
}
}

@Test(timeout = 60000)
public void testECAddExpectedReplicasToPending() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.setInt(DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 0);
conf.setInt(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,10);
conf.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,3);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build()) {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
Path dir = new Path("/dir");
dfs.mkdirs(dir);
dfs.enableErasureCodingPolicy("XOR-2-1-1024k");
dfs.setErasureCodingPolicy(dir, "XOR-2-1-1024k");

try (FSDataOutputStream str = dfs.create(new Path("/dir/file"));) {
DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(0));
DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(1));
Thread.sleep(1000);
for (int i = 0; i < 1024 * 1024; i++) {
str.write(i);
}
// Wait for dn2 IBR.
Thread.sleep(2000);
}

LocatedBlocks locatedBlocks = dfs.getClient().getBlockLocations("/dir/file", 1024 * 1024);
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
BlockInfoStriped blockInfo = (BlockInfoStriped)blockManager.getStoredBlock(locatedBlocks.getLocatedBlocks().get(0).getBlock().getLocalBlock());
assertEquals(1, blockInfo.numNodes());
int pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo);
assertEquals(2, pendingNum);

DataNodeTestUtils.resumeIBR(cluster.getDataNodes().get(0));
DataNodeTestUtils.resumeIBR(cluster.getDataNodes().get(1));
// Wait for dn0 dn1 IBR.
Thread.sleep(2000);
pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo);
assertEquals(1, pendingNum);

long timeoutNum = 0;
// Wait for pendingReconstruction timeout because the block group only has two replicas.
while(timeoutNum == 0) {
timeoutNum = blockManager.getNumTimedOutPendingReconstructions();
}
pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo);
assertEquals(0, pendingNum);
blockInfo = (BlockInfoStriped)blockManager.getStoredBlock(locatedBlocks.getLocatedBlocks().get(0).getBlock().getLocalBlock());
assertEquals(2, blockInfo.numNodes());


// Wait for pendingReconstruction timeout blocks to needReconstruction
Thread.sleep(3000 * 2);
pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo);
assertEquals(0, pendingNum);
blockInfo = (BlockInfoStriped)blockManager.getStoredBlock(locatedBlocks.getLocatedBlocks().get(0).getBlock().getLocalBlock());
assertEquals(2, blockInfo.numNodes());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ public static void wakeupPendingReconstructionTimerThread(
blockManager.pendingReconstruction.getTimerThread().interrupt();
}

public static int getNumReplicasInPendingReconstruction(final BlockManager blockManager,
BlockInfo blockInfo) {
return blockManager.pendingReconstruction.getNumReplicas(blockInfo);
}

public static HeartbeatManager getHeartbeatManager(
final BlockManager blockManager) {
return blockManager.getDatanodeManager().getHeartbeatManager();
Expand Down

0 comments on commit b21d58f

Please # to comment.