Skip to content

HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. [branch-3.3] #6105

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ public String getKeyToBufferDir() {
*/
public static abstract class DataBlock implements Closeable {

enum DestState {Writing, Upload, Closed}
public enum DestState {Writing, Upload, Closed}

private volatile DestState state = Writing;
private final long index;
Expand Down Expand Up @@ -375,7 +375,7 @@ protected final void verifyState(DestState expected)
*
* @return the current state.
*/
final DestState getState() {
public final DestState getState() {
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public FSDataOutputStream create(final Path f,
try {
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener);
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite,
permission == null ? FsPermission.getFileDefault() : permission,
FsPermission.getUMask(getConf()), tracingContext);
statIncrement(FILES_CREATED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withBlockFactory(blockFactory)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withPosition(position)
Expand Down Expand Up @@ -1940,6 +1940,11 @@ void setClient(AbfsClient client) {
this.client = client;
}

@VisibleForTesting
DataBlocks.BlockFactory getBlockFactory() {
return blockFactory;
}

@VisibleForTesting
void setNamespaceEnabled(Trilean isNamespaceEnabled){
this.isNamespaceEnabled = isNamespaceEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock blockToUpload,
outputStreamStatistics.uploadSuccessful(bytesLength);
return null;
} finally {
IOUtils.close(blockUploadData);
IOUtils.close(blockUploadData, blockToUpload);
}
});
writeOperations.add(new WriteOperation(job, offset, bytesLength));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,31 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.store.BlockUploadStatistics;
import org.apache.hadoop.fs.store.DataBlocks;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;

/**
* Test append operations.
Expand Down Expand Up @@ -90,4 +106,47 @@ public void testTracingForAppend() throws IOException {
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, 10);
}

@Test
public void testCloseOfDataBlockOnAppendComplete() throws Exception {
Set<String> blockBufferTypes = new HashSet<>();
blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
for (String blockBufferType : blockBufferTypes) {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
Mockito.doAnswer(getBlobFactoryInvocation -> {
DataBlocks.BlockFactory factory = Mockito.spy(
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
Mockito.doAnswer(factoryCreateInvocation -> {
dataBlock[0] = Mockito.spy(
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
return dataBlock[0];
})
.when(factory)
.create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
BlockUploadStatistics.class));
return factory;
}).when(store).getBlockFactory();
try (OutputStream os = fs.create(
new Path(getMethodName() + "_" + blockBufferType))) {
os.write(new byte[1]);
Assertions.assertThat(dataBlock[0].getState())
.describedAs(
"On write of data in outputStream, state should become Writing")
.isEqualTo(Writing);
os.close();
Mockito.verify(dataBlock[0], Mockito.times(1)).close();
Assertions.assertThat(dataBlock[0].getState())
.describedAs("On close of outputStream, state should become Closed")
.isEqualTo(Closed);
}
}
}
}