Skip to content

Commit

Permalink
Integrate PutIfNotExist functionality into S3A
Browse files Browse the repository at this point in the history
  • Loading branch information
diljot grewal committed Aug 23, 2024
1 parent 6be0463 commit 19cc916
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,13 @@ private Constants() {
*/
public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance";

/**
* Flag for commit if none match.
* This can be set in the {code createFile()} builder.
* Value {@value}.
*/
public static final String FS_S3A_CREATE_IF_NONE_MATCH = "fs.s3a.create.header.If-None-Match";

/**
* Default value for create performance in an S3A FS.
* Value {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -99,6 +100,8 @@ class S3ABlockOutputStream extends OutputStream implements
private static final String E_NOT_SYNCABLE =
"S3A streams are not Syncable. See HADOOP-17597.";

public static final String IF_NONE_MATCH_HEADER = "If-None-Match";

/** Object being uploaded. */
private final String key;

Expand Down Expand Up @@ -596,7 +599,7 @@ private long putObject() throws IOException {
final S3ADataBlocks.DataBlock block = getActiveBlock();
long size = block.dataSize();
final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
PutObjectRequest putObjectRequest = uploadData.hasFile() ?
writeOperationHelper.createPutObjectRequest(
key,
uploadData.getFile().length(),
Expand All @@ -608,6 +611,16 @@ private long putObject() throws IOException {
builder.putOptions,
false);

PutObjectRequest.Builder maybeModifiedPutIfAbsentRequest = putObjectRequest.toBuilder();
Map<String, String> optionHeaders = builder.putOptions.getHeaders();

if (optionHeaders != null && optionHeaders.containsKey(IF_NONE_MATCH_HEADER)) {
maybeModifiedPutIfAbsentRequest.overrideConfiguration(
override -> override.putHeader(IF_NONE_MATCH_HEADER, optionHeaders.get(IF_NONE_MATCH_HEADER)));
}

final PutObjectRequest finalizedRequest = maybeModifiedPutIfAbsentRequest.build();

BlockUploadProgress progressCallback =
new BlockUploadProgress(block, progressListener, now());
statistics.blockUploadQueued(size);
Expand All @@ -617,7 +630,7 @@ private long putObject() throws IOException {
// the putObject call automatically closes the input
// stream afterwards.
PutObjectResponse response =
writeOperationHelper.putObject(putObjectRequest, builder.putOptions, uploadData,
writeOperationHelper.putObject(finalizedRequest, builder.putOptions, uploadData,
uploadData.hasFile(), statistics);
progressCallback.progressChanged(REQUEST_BYTE_TRANSFER_EVENT);
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload(
retrying,
() -> {
final CompleteMultipartUploadRequest.Builder requestBuilder =
getRequestFactory().newCompleteMultipartUploadRequestBuilder(
destKey, uploadId, partETags);
getRequestFactory().newCompleteMultipartUploadRequestBuilder(destKey, uploadId, partETags, putOptions);
return writeOperationHelperCallbacks.completeMultipartUpload(requestBuilder.build());
});
owner.finishedWrite(destKey, length, uploadResult.eTag(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,16 @@ CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(

/**
* Complete a multipart upload.
* @param destKey destination object key
* @param uploadId ID of initiated upload
* @param partETags ordered list of etags
*
* @param destKey destination object key
* @param uploadId ID of initiated upload
* @param partETags ordered list of etags
* @return the request builder.
*/
CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
List<CompletedPart> partETags);
List<CompletedPart> partETags, PutObjectOptions putOptions);

/**
* Create a HEAD object request builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ public static final class CreateFileOptions {
private final Map<String, String> headers;

/**
* @param flags creation flags
* @param recursive create parent dirs?
* @param flags creation flags
* @param recursive create parent dirs?
* @param performance performance flag
* @param headers nullable header map.
* @param
* @param headers nullable header map.
*/
public CreateFileOptions(
final EnumSet<CreateFlag> flags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.hadoop.fs.s3a.Constants.DIRECTORY_OPERATIONS_PURGE_UPLOADS;
import static org.apache.hadoop.fs.s3a.Constants.ENABLE_MULTI_DELETE;
import static org.apache.hadoop.fs.s3a.Constants.FIPS_ENDPOINT;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_PERFORMANCE_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_AWS_V2;
Expand Down Expand Up @@ -260,7 +261,7 @@ private InternalConstants() {
*/
public static final Set<String> CREATE_FILE_KEYS =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE)));
new HashSet<>(Arrays.asList(FS_S3A_CREATE_PERFORMANCE, FS_S3A_CREATE_IF_NONE_MATCH)));

/**
* Dynamic Path capabilities to be evaluated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;

import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -517,12 +518,22 @@ public CreateMultipartUploadRequest.Builder newMultipartUploadRequestBuilder(
public CompleteMultipartUploadRequest.Builder newCompleteMultipartUploadRequestBuilder(
String destKey,
String uploadId,
List<CompletedPart> partETags) {
List<CompletedPart> partETags,
PutObjectOptions putOptions) {

// a copy of the list is required, so that the AWS SDK doesn't
// attempt to sort an unmodifiable list.
CompleteMultipartUploadRequest.Builder requestBuilder =
CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
CompleteMultipartUploadRequest.Builder requestBuilder;
Map<String, String> optionHeaders = putOptions.getHeaders();

if (optionHeaders != null && optionHeaders.containsKey("If-None-Match")) {
requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
.overrideConfiguration(override ->override.putHeader("If-None-Match", optionHeaders.get("If-None-Match")))
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
} else {
requestBuilder = CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
.multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
}

return prepareRequest(requestBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.apache.hadoop.fs.s3a.impl;

Check failure on line 1 in hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java#L1

asflicense: Missing Apache License

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.io.IOUtils;

import org.junit.Assert;
import org.junit.Test;
import software.amazon.awssdk.services.s3.model.S3Exception;

import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER;
import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_IF_NONE_MATCH;
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.scale.ITestS3AMultipartUploadSizeLimits.MPU_SIZE;
import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1MB;


public class ITestS3APutIfMatch extends AbstractS3ATestBase {

@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
removeBaseAndBucketOverrides(conf,
MULTIPART_SIZE,
UPLOAD_PART_COUNT_LIMIT);
conf.setLong(MULTIPART_SIZE, MPU_SIZE);
conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
return conf;
}

protected String getBlockOutputBufferName() {
return FAST_UPLOAD_BUFFER_ARRAY;
}

/**
* Create a file using the PutIfMatch feature from S3
* @param fs filesystem
* @param path path to write
* @param data source dataset. Can be null
* @throws IOException on any problem
*/
private static void createFileWithIfNoneMatchFlag(FileSystem fs,
Path path,
byte[] data,
String ifMatchTag) throws Exception {
FSDataOutputStreamBuilder builder = fs.createFile(path);
builder.must(FS_S3A_CREATE_IF_NONE_MATCH, ifMatchTag);
FSDataOutputStream stream = builder.create().build();
if (data != null && data.length > 0) {
stream.write(data);
}
stream.close();
IOUtils.closeStream(stream);
}

@Test
public void testPutIfAbsentConflict() throws IOException {
FileSystem fs = getFileSystem();
Path testFile = methodPath();

fs.mkdirs(testFile.getParent());
byte[] fileBytes = dataset(TEST_FILE_LEN, 0, 255);

try {
createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
} catch (Exception e) {
Assert.assertEquals(RemoteFileChangedException.class, e.getClass());

S3Exception s3Exception = (S3Exception) e.getCause();
Assert.assertEquals(s3Exception.statusCode(), 412);
}
}


@Test
public void testPutIfAbsentLargeFileConflict() throws IOException {
FileSystem fs = getFileSystem();
Path testFile = methodPath();

fs.mkdirs(testFile.getParent());
// enough bytes for Multipart Upload
byte[] fileBytes = dataset(6 * _1MB, 'a', 'z' - 'a');

try {
createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
createFileWithIfNoneMatchFlag(fs, testFile, fileBytes, "*");
} catch (Exception e) {
Assert.assertEquals(RemoteFileChangedException.class, e.getClass());

// Error gets caught here:
S3Exception s3Exception = (S3Exception) e.getCause();
Assert.assertEquals(s3Exception.statusCode(), 412);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ public void testPerformanceSupport() throws Throwable {
public void testHeaderOptions() throws Throwable {
final CreateFileBuilder builder = mkBuilder().create()
.must(FS_S3A_CREATE_HEADER + ".retention", "permanent")
.must(FS_S3A_CREATE_HEADER + ".If-None-Match", "*")
.opt(FS_S3A_CREATE_HEADER + ".owner", "engineering");
final Map<String, String> headers = build(builder).getHeaders();
Assertions.assertThat(headers)
.containsEntry("retention", "permanent")
.containsEntry("owner", "engineering");
.containsEntry("owner", "engineering")
.containsEntry("If-None-Match", "*");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;

import software.amazon.awssdk.awscore.AwsRequest;
Expand Down Expand Up @@ -162,7 +163,7 @@ private void createFactoryObjects(RequestFactory factory) throws IOException {
String id = "1";
a(factory.newAbortMultipartUploadRequestBuilder(path, id));
a(factory.newCompleteMultipartUploadRequestBuilder(path, id,
new ArrayList<>()));
new ArrayList<>(), new PutObjectOptions(false, "some class", Collections.emptyMap())));
a(factory.newCopyObjectRequestBuilder(path, path2,
HeadObjectResponse.builder().build()));
a(factory.newDeleteObjectRequestBuilder(path));
Expand Down

0 comments on commit 19cc916

Please # to comment.