Skip to content

Commit 3f87cc5

Browse files
committed
HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6010)
AbfsOutputStream to close the dataBlock object created for the upload. Contributed By: Pranav Saxena
1 parent ed7aecf commit 3f87cc5

File tree

5 files changed

+69
-5
lines changed

5 files changed

+69
-5
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ public String getKeyToBufferDir() {
329329
*/
330330
public static abstract class DataBlock implements Closeable {
331331

332-
enum DestState {Writing, Upload, Closed}
332+
public enum DestState {Writing, Upload, Closed}
333333

334334
private volatile DestState state = Writing;
335335
private final long index;
@@ -375,7 +375,7 @@ protected final void verifyState(DestState expected)
375375
*
376376
* @return the current state.
377377
*/
378-
final DestState getState() {
378+
public final DestState getState() {
379379
return state;
380380
}
381381

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ public FSDataOutputStream create(final Path f,
330330
try {
331331
TracingContext tracingContext = new TracingContext(clientCorrelationId,
332332
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
333-
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
333+
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
334334
permission == null ? FsPermission.getFileDefault() : permission,
335335
FsPermission.getUMask(getConf()), tracingContext);
336336
statIncrement(FILES_CREATED);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
707707
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
708708
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
709709
.withLease(lease)
710-
.withBlockFactory(blockFactory)
710+
.withBlockFactory(getBlockFactory())
711711
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
712712
.withClient(client)
713713
.withPosition(position)
@@ -1940,6 +1940,11 @@ void setClient(AbfsClient client) {
19401940
this.client = client;
19411941
}
19421942

1943+
@VisibleForTesting
1944+
DataBlocks.BlockFactory getBlockFactory() {
1945+
return blockFactory;
1946+
}
1947+
19431948
@VisibleForTesting
19441949
void setNamespaceEnabled(Trilean isNamespaceEnabled){
19451950
this.isNamespaceEnabled = isNamespaceEnabled;

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
344344
outputStreamStatistics.uploadSuccessful(bytesLength);
345345
return null;
346346
} finally {
347-
IOUtils.close(blockUploadData);
347+
IOUtils.close(blockUploadData, blockToUpload);
348348
}
349349
});
350350
writeOperations.add(new WriteOperation(job, offset, bytesLength));

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,31 @@
2020

2121
import java.io.FileNotFoundException;
2222
import java.io.IOException;
23+
import java.io.OutputStream;
24+
import java.util.HashSet;
2325
import java.util.Random;
26+
import java.util.Set;
2427

28+
import org.assertj.core.api.Assertions;
2529
import org.junit.Test;
30+
import org.mockito.Mockito;
2631

32+
import org.apache.hadoop.conf.Configuration;
2733
import org.apache.hadoop.fs.FSDataOutputStream;
34+
import org.apache.hadoop.fs.FileSystem;
2835
import org.apache.hadoop.fs.Path;
2936
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
3037
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
3138
import org.apache.hadoop.fs.contract.ContractTestUtils;
39+
import org.apache.hadoop.fs.store.BlockUploadStatistics;
40+
import org.apache.hadoop.fs.store.DataBlocks;
41+
42+
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
43+
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
44+
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
45+
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
46+
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
47+
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
3248

3349
/**
3450
* Test append operations.
@@ -90,4 +106,47 @@ public void testTracingForAppend() throws IOException {
90106
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
91107
fs.append(testPath, 10);
92108
}
109+
110+
@Test
111+
public void testCloseOfDataBlockOnAppendComplete() throws Exception {
112+
Set<String> blockBufferTypes = new HashSet<>();
113+
blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
114+
blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
115+
blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
116+
for (String blockBufferType : blockBufferTypes) {
117+
Configuration configuration = new Configuration(getRawConfiguration());
118+
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
119+
AzureBlobFileSystem fs = Mockito.spy(
120+
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
121+
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
122+
Mockito.doReturn(store).when(fs).getAbfsStore();
123+
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
124+
Mockito.doAnswer(getBlobFactoryInvocation -> {
125+
DataBlocks.BlockFactory factory = Mockito.spy(
126+
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
127+
Mockito.doAnswer(factoryCreateInvocation -> {
128+
dataBlock[0] = Mockito.spy(
129+
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
130+
return dataBlock[0];
131+
})
132+
.when(factory)
133+
.create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
134+
BlockUploadStatistics.class));
135+
return factory;
136+
}).when(store).getBlockFactory();
137+
try (OutputStream os = fs.create(
138+
new Path(getMethodName() + "_" + blockBufferType))) {
139+
os.write(new byte[1]);
140+
Assertions.assertThat(dataBlock[0].getState())
141+
.describedAs(
142+
"On write of data in outputStream, state should become Writing")
143+
.isEqualTo(Writing);
144+
os.close();
145+
Mockito.verify(dataBlock[0], Mockito.times(1)).close();
146+
Assertions.assertThat(dataBlock[0].getState())
147+
.describedAs("On close of outputStream, state should become Closed")
148+
.isEqualTo(Closed);
149+
}
150+
}
151+
}
93152
}

0 commit comments

Comments
 (0)