diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d1f02c47e90b8..71aa7a690ff80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1252,30 +1252,21 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc, /** * If IBR is not sent from expected locations yet, add the datanodes to * pendingReconstruction in order to keep RedundancyMonitor from scheduling - * the block. In case of erasure coding blocks, adds only in case there - * isn't any missing node. + * the block. */ public void addExpectedReplicasToPending(BlockInfo blk) { - boolean addForStriped = false; DatanodeStorageInfo[] expectedStorages = blk.getUnderConstructionFeature().getExpectedStorageLocations(); - if (blk.isStriped()) { - BlockInfoStriped blkStriped = (BlockInfoStriped) blk; - addForStriped = - blkStriped.getRealTotalBlockNum() == expectedStorages.length; - } - if (!blk.isStriped() || addForStriped) { - if (expectedStorages.length - blk.numNodes() > 0) { - ArrayList pendingNodes = new ArrayList<>(); - for (DatanodeStorageInfo storage : expectedStorages) { - DatanodeDescriptor dnd = storage.getDatanodeDescriptor(); - if (blk.findStorageInfo(dnd) == null) { - pendingNodes.add(storage); - } + if (expectedStorages.length - blk.numNodes() > 0) { + ArrayList pendingNodes = new ArrayList<>(); + for (DatanodeStorageInfo storage : expectedStorages) { + DatanodeDescriptor dnd = storage.getDatanodeDescriptor(); + if (blk.findStorageInfo(dnd) == null) { + pendingNodes.add(storage); } - pendingReconstruction.increment(blk, - pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()])); } + pendingReconstruction.increment(blk, + pendingNodes.toArray(new DatanodeStorageInfo[pendingNodes.size()])); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index 669224818f07c..176e72efcb714 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -20,6 +20,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.FS_CLIENT_TOPOLOGY_RESOLUTION_ENABLED; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.client.HdfsAdmin.TRASH_PERMISSION; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CONTEXT; import static org.assertj.core.api.Assertions.assertThat; @@ -107,8 +109,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRackFaultTolerant; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -2780,4 +2786,64 @@ public void testAllRackFailureDuringPipelineSetup() throws Exception { } } + @Test(timeout = 60000) + public void testECAddExpectedReplicasToPending() throws Exception { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 0); + conf.setInt(DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,10); + conf.setInt(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,3); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build()) { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + Path dir = new Path("/dir"); + dfs.mkdirs(dir); + dfs.enableErasureCodingPolicy("XOR-2-1-1024k"); + dfs.setErasureCodingPolicy(dir, "XOR-2-1-1024k"); + + try (FSDataOutputStream str = dfs.create(new Path("/dir/file"));) { + DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(0)); + DataNodeTestUtils.pauseIBR(cluster.getDataNodes().get(1)); + Thread.sleep(1000); + for (int i = 0; i < 1024 * 1024; i++) { + str.write(i); + } + // Wait for dn2 IBR. + Thread.sleep(2000); + } + + LocatedBlocks locatedBlocks = dfs.getClient().getBlockLocations("/dir/file", 1024 * 1024); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + BlockInfoStriped blockInfo = (BlockInfoStriped)blockManager.getStoredBlock(locatedBlocks.getLocatedBlocks().get(0).getBlock().getLocalBlock()); + assertEquals(1, blockInfo.numNodes()); + int pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo); + assertEquals(2, pendingNum); + + DataNodeTestUtils.resumeIBR(cluster.getDataNodes().get(0)); + DataNodeTestUtils.resumeIBR(cluster.getDataNodes().get(1)); + // Wait for dn0 dn1 IBR. + Thread.sleep(2000); + pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo); + assertEquals(1, pendingNum); + + long timeoutNum = 0; + // Wait for pendingReconstruction timeout because the block group only has two replicas. + while(timeoutNum == 0) { + timeoutNum = blockManager.getNumTimedOutPendingReconstructions(); + } + pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo); + assertEquals(0, pendingNum); + blockInfo = (BlockInfoStriped)blockManager.getStoredBlock(locatedBlocks.getLocatedBlocks().get(0).getBlock().getLocalBlock()); + assertEquals(2, blockInfo.numNodes()); + + + // Wait for pendingReconstruction timeout blocks to needReconstruction + Thread.sleep(3000 * 2); + pendingNum = BlockManagerTestUtil.getNumReplicasInPendingReconstruction(blockManager,blockInfo); + assertEquals(0, pendingNum); + blockInfo = (BlockInfoStriped)blockManager.getStoredBlock(locatedBlocks.getLocatedBlocks().get(0).getBlock().getLocalBlock()); + assertEquals(2, blockInfo.numNodes()); + } + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 430366306c2ba..24099f33160a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -172,6 +172,11 @@ public static void wakeupPendingReconstructionTimerThread( blockManager.pendingReconstruction.getTimerThread().interrupt(); } + public static int getNumReplicasInPendingReconstruction(final BlockManager blockManager, + BlockInfo blockInfo) { + return blockManager.pendingReconstruction.getNumReplicas(blockInfo); + } + public static HeartbeatManager getHeartbeatManager( final BlockManager blockManager) { return blockManager.getDatanodeManager().getHeartbeatManager();