diff --git a/pom.xml b/pom.xml
index 472a930..2e7c60c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
software.amazon.payloadoffloading
payloadoffloading-common
- 2.1.2
+ 2.2.0
jar
Payload offloading common library for AWS
Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3.
diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java
new file mode 100644
index 0000000..3bd8d08
--- /dev/null
+++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java
@@ -0,0 +1,153 @@
+package software.amazon.payloadoffloading;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.annotations.NotThreadSafe;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+
+/**
+ *
Amazon payload storage configuration options such as asynchronous Amazon S3 client,
+ * bucket name, and payload size threshold for payloads.
+ *
+ * Server side encryption is optional and can be enabled using with {@link #withServerSideEncryption(ServerSideEncryptionStrategy)}
+ * or {@link #setServerSideEncryptionStrategy(ServerSideEncryptionStrategy)}
+ *
+ * There are two possible options for server side encrption. This can be using a customer managed key or AWS managed CMK.
+ *
+ * Example usage:
+ *
+ *
+ * withServerSideEncryption(ServerSideEncrptionFactory.awsManagedCmk())
+ *
+ *
+ * or
+ *
+ *
+ * withServerSideEncryption(ServerSideEncrptionFactory.customerKey(YOUR_CUSTOMER_ID))
+ *
+ *
+ * @see software.amazon.payloadoffloading.ServerSideEncryptionFactory
+ */
+@NotThreadSafe
+public class PayloadStorageAsyncConfiguration extends PayloadStorageConfigurationBase {
+ private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageAsyncConfiguration.class);
+
+ private S3AsyncClient s3Async;
+
+ public PayloadStorageAsyncConfiguration() {
+ s3Async = null;
+ }
+
+ public PayloadStorageAsyncConfiguration(PayloadStorageAsyncConfiguration other) {
+ super(other);
+ this.s3Async = other.getS3AsyncClient();
+ }
+
+ /**
+ * Enables support for payloads using asynchronous storage.
+ *
+ * @param s3Async Amazon S3 client which is going to be used for storing payload.
+ * @param s3BucketName Name of the bucket which is going to be used for storing payload.
+ * The bucket must be already created and configured in s3.
+ */
+ public void setPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) {
+ if (s3Async == null || s3BucketName == null) {
+ String errorMessage = "S3 client and/or S3 bucket name cannot be null.";
+ LOG.error(errorMessage);
+ throw SdkClientException.create(errorMessage);
+ }
+ super.setPayloadSupportEnabled(s3BucketName);
+ this.s3Async = s3Async;
+ }
+
+ /**
+ * Enables support for payload.
+ *
+ * @param s3Async Amazon S3 client which is going to be used for storing payload.
+ * @param s3BucketName Name of the bucket which is going to be used for storing payloads.
+ * The bucket must be already created and configured in s3.
+ * @return the updated PayloadStorageAsyncConfiguration object.
+ */
+ public PayloadStorageAsyncConfiguration withPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) {
+ setPayloadSupportEnabled(s3Async, s3BucketName);
+ return this;
+ }
+
+ /**
+ * Disables support for payloads.
+ */
+ public void setPayloadSupportDisabled() {
+ super.setPayloadSupportDisabled();
+ s3Async = null;
+ LOG.info("Payload support disabled.");
+ }
+
+ /**
+ * Disables support for payload.
+ *
+ * @return the updated PayloadStorageAsyncConfiguration object.
+ */
+ public PayloadStorageAsyncConfiguration withPayloadSupportDisabled() {
+ setPayloadSupportDisabled();
+ return this;
+ }
+
+ /**
+ * Gets the Amazon S3 async client which is being used for storing payloads.
+ *
+ * @return Reference to the Amazon S3 async client which is being used.
+ */
+ public S3AsyncClient getS3AsyncClient() {
+ return s3Async;
+ }
+
+ /**
+ * Sets the payload size threshold for storing payloads in Amazon S3.
+ *
+ * @param payloadSizeThreshold Payload size threshold to be used for storing in Amazon S3.
+ * Default: 256KB.
+ * @return the updated PayloadStorageAsyncConfiguration object.
+ */
+ public PayloadStorageAsyncConfiguration withPayloadSizeThreshold(int payloadSizeThreshold) {
+ setPayloadSizeThreshold(payloadSizeThreshold);
+ return this;
+ }
+
+ /**
+ * Sets whether or not all payloads regardless of their size should be stored in Amazon S3.
+ *
+ * @param alwaysThroughS3 Whether or not all payloads regardless of their size
+ * should be stored in Amazon S3. Default: false
+ * @return the updated PayloadStorageAsyncConfiguration object.
+ */
+ public PayloadStorageAsyncConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) {
+ setAlwaysThroughS3(alwaysThroughS3);
+ return this;
+ }
+
+ /**
+ * Sets which method of server side encryption should be used, if required.
+ *
+ * This is optional, it is set only when you want to configure S3 server side encryption with KMS.
+ *
+ * @param serverSideEncryptionStrategy The method of encryption required for S3 server side encryption with KMS.
+ * @return the updated PayloadStorageAsyncConfiguration object.
+ */
+ public PayloadStorageAsyncConfiguration withServerSideEncryption(ServerSideEncryptionStrategy serverSideEncryptionStrategy) {
+ setServerSideEncryptionStrategy(serverSideEncryptionStrategy);
+ return this;
+ }
+
+ /**
+ * Configures the ACL to apply to the Amazon S3 putObject request.
+ * @param objectCannedACL
+ * The ACL to be used when storing objects in Amazon S3
+ */
+ public PayloadStorageAsyncConfiguration withObjectCannedACL(ObjectCannedACL objectCannedACL) {
+ setObjectCannedACL(objectCannedACL);
+ return this;
+ }
+}
diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java
index d57bcc2..9ab3c10 100644
--- a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java
+++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java
@@ -8,7 +8,7 @@
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
/**
- * Amazon payload storage configuration options such as Amazon S3 client,
+ *
Amazon payload storage configuration options such as synchronous Amazon S3 client,
* bucket name, and payload size threshold for payloads.
*
* Server side encryption is optional and can be enabled using with {@link #withServerSideEncryption(ServerSideEncryptionStrategy)}
@@ -31,38 +31,18 @@
* @see software.amazon.payloadoffloading.ServerSideEncryptionFactory
*/
@NotThreadSafe
-public class PayloadStorageConfiguration {
+public class PayloadStorageConfiguration extends PayloadStorageConfigurationBase {
private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfiguration.class);
private S3Client s3;
- private String s3BucketName;
- private int payloadSizeThreshold = 0;
- private boolean alwaysThroughS3 = false;
- private boolean payloadSupport = false;
- /**
- * This field is optional, it is set only when we want to configure S3 Server Side Encryption with KMS.
- */
- private ServerSideEncryptionStrategy serverSideEncryptionStrategy;
- /**
- * This field is optional, it is set only when we want to add access control list to Amazon S3 buckets and objects
- */
- private ObjectCannedACL objectCannedACL;
public PayloadStorageConfiguration() {
s3 = null;
- s3BucketName = null;
- serverSideEncryptionStrategy = null;
- objectCannedACL = null;
}
public PayloadStorageConfiguration(PayloadStorageConfiguration other) {
+ super(other);
this.s3 = other.getS3Client();
- this.s3BucketName = other.getS3BucketName();
- this.payloadSupport = other.isPayloadSupportEnabled();
- this.alwaysThroughS3 = other.isAlwaysThroughS3();
- this.payloadSizeThreshold = other.getPayloadSizeThreshold();
- this.serverSideEncryptionStrategy = other.getServerSideEncryptionStrategy();
- this.objectCannedACL = other.getObjectCannedACL();
}
/**
@@ -78,13 +58,8 @@ public void setPayloadSupportEnabled(S3Client s3, String s3BucketName) {
LOG.error(errorMessage);
throw SdkClientException.create(errorMessage);
}
- if (isPayloadSupportEnabled()) {
- LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName.");
- }
+ super.setPayloadSupportEnabled(s3BucketName);
this.s3 = s3;
- this.s3BucketName = s3BucketName;
- this.payloadSupport = true;
- LOG.info("Payload support enabled.");
}
/**
@@ -104,10 +79,8 @@ public PayloadStorageConfiguration withPayloadSupportEnabled(S3Client s3, String
* Disables support for payloads.
*/
public void setPayloadSupportDisabled() {
+ super.setPayloadSupportDisabled();
s3 = null;
- s3BucketName = null;
- payloadSupport = false;
- LOG.info("Payload support disabled.");
}
/**
@@ -120,15 +93,6 @@ public PayloadStorageConfiguration withPayloadSupportDisabled() {
return this;
}
- /**
- * Check if the support for payloads if enabled.
- *
- * @return true if support for payloads is enabled.
- */
- public boolean isPayloadSupportEnabled() {
- return payloadSupport;
- }
-
/**
* Gets the Amazon S3 client which is being used for storing payloads.
*
@@ -138,15 +102,6 @@ public S3Client getS3Client() {
return s3;
}
- /**
- * Gets the name of the S3 bucket which is being used for storing payload.
- *
- * @return The name of the bucket which is being used.
- */
- public String getS3BucketName() {
- return s3BucketName;
- }
-
/**
* Sets the payload size threshold for storing payloads in Amazon S3.
*
@@ -159,25 +114,6 @@ public PayloadStorageConfiguration withPayloadSizeThreshold(int payloadSizeThres
return this;
}
- /**
- * Gets the payload size threshold for storing payloads in Amazon S3.
- *
- * @return payload size threshold which is being used for storing in Amazon S3. Default: 256KB.
- */
- public int getPayloadSizeThreshold() {
- return payloadSizeThreshold;
- }
-
- /**
- * Sets the payload size threshold for storing payloads in Amazon S3.
- *
- * @param payloadSizeThreshold Payload size threshold to be used for storing in Amazon S3.
- * Default: 256KB.
- */
- public void setPayloadSizeThreshold(int payloadSizeThreshold) {
- this.payloadSizeThreshold = payloadSizeThreshold;
- }
-
/**
* Sets whether or not all payloads regardless of their size should be stored in Amazon S3.
*
@@ -190,25 +126,6 @@ public PayloadStorageConfiguration withAlwaysThroughS3(boolean alwaysThroughS3)
return this;
}
- /**
- * Checks whether or not all payloads regardless of their size are being stored in Amazon S3.
- *
- * @return True if all payloads regardless of their size are being stored in Amazon S3. Default: false
- */
- public boolean isAlwaysThroughS3() {
- return alwaysThroughS3;
- }
-
- /**
- * Sets whether or not all payloads regardless of their size should be stored in Amazon S3.
- *
- * @param alwaysThroughS3 Whether or not all payloads regardless of their size
- * should be stored in Amazon S3. Default: false
- */
- public void setAlwaysThroughS3(boolean alwaysThroughS3) {
- this.alwaysThroughS3 = alwaysThroughS3;
- }
-
/**
* Sets which method of server side encryption should be used, if required.
*
@@ -222,35 +139,6 @@ public PayloadStorageConfiguration withServerSideEncryption(ServerSideEncryption
return this;
}
- /**
- * Sets which method of server side encryption should be use, if required.
- *
- * This is optional, it is set only when you want to configure S3 Server Side Encryption with KMS.
- *
- * @param serverSideEncryptionStrategy The method of encryption required for S3 server side encryption with KMS.
- */
- public void setServerSideEncryptionStrategy(ServerSideEncryptionStrategy serverSideEncryptionStrategy) {
- this.serverSideEncryptionStrategy = serverSideEncryptionStrategy;
- }
-
- /**
- * The method of service side encryption which should be used, if required.
- *
- * @return The server side encryption method required. Default null.
- */
- public ServerSideEncryptionStrategy getServerSideEncryptionStrategy() {
- return this.serverSideEncryptionStrategy;
- }
-
- /**
- * Configures the ACL to apply to the Amazon S3 putObject request.
- * @param objectCannedACL
- * The ACL to be used when storing objects in Amazon S3
- */
- public void setObjectCannedACL(ObjectCannedACL objectCannedACL) {
- this.objectCannedACL = objectCannedACL;
- }
-
/**
* Configures the ACL to apply to the Amazon S3 putObject request.
* @param objectCannedACL
@@ -260,20 +148,4 @@ public PayloadStorageConfiguration withObjectCannedACL(ObjectCannedACL objectCan
setObjectCannedACL(objectCannedACL);
return this;
}
-
- /**
- * Checks whether an ACL have been configured for storing objects in Amazon S3.
- * @return True if ACL is defined
- */
- public boolean isObjectCannedACLDefined() {
- return null != objectCannedACL;
- }
-
- /**
- * Gets the AWS ACL to apply to the Amazon S3 putObject request.
- * @return Amazon S3 object ACL
- */
- public ObjectCannedACL getObjectCannedACL() {
- return objectCannedACL;
- }
}
diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java
new file mode 100644
index 0000000..7d08746
--- /dev/null
+++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java
@@ -0,0 +1,178 @@
+package software.amazon.payloadoffloading;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.annotations.NotThreadSafe;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+
+/**
+ *
Base class for Amazon payload storage configuration options such as Amazon S3 client,
+ * bucket name, and payload size threshold for payloads.
+ *
+ * @see PayloadStorageConfiguration
+ * @see PayloadStorageAsyncConfiguration
+ */
+@NotThreadSafe
+public abstract class PayloadStorageConfigurationBase {
+ private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfigurationBase.class);
+
+ private String s3BucketName;
+ private int payloadSizeThreshold = 0;
+ private boolean alwaysThroughS3 = false;
+ private boolean payloadSupport = false;
+ /**
+ * This field is optional, it is set only when we want to configure S3 Server Side Encryption with KMS.
+ */
+ private ServerSideEncryptionStrategy serverSideEncryptionStrategy;
+ /**
+ * This field is optional, it is set only when we want to add access control list to Amazon S3 buckets and objects
+ */
+ private ObjectCannedACL objectCannedACL;
+
+ public PayloadStorageConfigurationBase() {
+ s3BucketName = null;
+ serverSideEncryptionStrategy = null;
+ objectCannedACL = null;
+ }
+
+ public PayloadStorageConfigurationBase(PayloadStorageConfigurationBase other) {
+ this.s3BucketName = other.getS3BucketName();
+ this.payloadSupport = other.isPayloadSupportEnabled();
+ this.alwaysThroughS3 = other.isAlwaysThroughS3();
+ this.payloadSizeThreshold = other.getPayloadSizeThreshold();
+ this.serverSideEncryptionStrategy = other.getServerSideEncryptionStrategy();
+ this.objectCannedACL = other.getObjectCannedACL();
+ }
+
+ /**
+ * Enables support for payloads .
+ *
+ * @param s3BucketName Name of the bucket which is going to be used for storing payload.
+ * The bucket must be already created and configured in s3.
+ */
+ protected void setPayloadSupportEnabled(String s3BucketName) {
+ if (s3BucketName == null) {
+ String errorMessage = "S3 bucket name cannot be null.";
+ LOG.error(errorMessage);
+ throw SdkClientException.create(errorMessage);
+ }
+ if (isPayloadSupportEnabled()) {
+ LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName.");
+ }
+ this.s3BucketName = s3BucketName;
+ this.payloadSupport = true;
+ LOG.info("Payload support enabled.");
+ }
+
+ /**
+ * Disables support for payloads.
+ */
+ public void setPayloadSupportDisabled() {
+ s3BucketName = null;
+ payloadSupport = false;
+ LOG.info("Payload support disabled.");
+ }
+
+ /**
+ * Check if the support for payloads if enabled.
+ *
+ * @return true if support for payloads is enabled.
+ */
+ public boolean isPayloadSupportEnabled() {
+ return payloadSupport;
+ }
+
+ /**
+ * Gets the name of the S3 bucket which is being used for storing payload.
+ *
+ * @return The name of the bucket which is being used.
+ */
+ public String getS3BucketName() {
+ return s3BucketName;
+ }
+
+ /**
+ * Gets the payload size threshold for storing payloads in Amazon S3.
+ *
+ * @return payload size threshold which is being used for storing in Amazon S3. Default: 256KB.
+ */
+ public int getPayloadSizeThreshold() {
+ return payloadSizeThreshold;
+ }
+
+ /**
+ * Sets the payload size threshold for storing payloads in Amazon S3.
+ *
+ * @param payloadSizeThreshold Payload size threshold to be used for storing in Amazon S3.
+ * Default: 256KB.
+ */
+ public void setPayloadSizeThreshold(int payloadSizeThreshold) {
+ this.payloadSizeThreshold = payloadSizeThreshold;
+ }
+
+ /**
+ * Checks whether or not all payloads regardless of their size are being stored in Amazon S3.
+ *
+ * @return True if all payloads regardless of their size are being stored in Amazon S3. Default: false
+ */
+ public boolean isAlwaysThroughS3() {
+ return alwaysThroughS3;
+ }
+
+ /**
+ * Sets whether or not all payloads regardless of their size should be stored in Amazon S3.
+ *
+ * @param alwaysThroughS3 Whether or not all payloads regardless of their size
+ * should be stored in Amazon S3. Default: false
+ */
+ public void setAlwaysThroughS3(boolean alwaysThroughS3) {
+ this.alwaysThroughS3 = alwaysThroughS3;
+ }
+
+ /**
+ * Sets which method of server side encryption should be use, if required.
+ *
+ * This is optional, it is set only when you want to configure S3 Server Side Encryption with KMS.
+ *
+ * @param serverSideEncryptionStrategy The method of encryption required for S3 server side encryption with KMS.
+ */
+ public void setServerSideEncryptionStrategy(ServerSideEncryptionStrategy serverSideEncryptionStrategy) {
+ this.serverSideEncryptionStrategy = serverSideEncryptionStrategy;
+ }
+
+ /**
+ * The method of service side encryption which should be used, if required.
+ *
+ * @return The server side encryption method required. Default null.
+ */
+ public ServerSideEncryptionStrategy getServerSideEncryptionStrategy() {
+ return this.serverSideEncryptionStrategy;
+ }
+
+ /**
+ * Configures the ACL to apply to the Amazon S3 putObject request.
+ * @param objectCannedACL
+ * The ACL to be used when storing objects in Amazon S3
+ */
+ public void setObjectCannedACL(ObjectCannedACL objectCannedACL) {
+ this.objectCannedACL = objectCannedACL;
+ }
+
+ /**
+ * Checks whether an ACL have been configured for storing objects in Amazon S3.
+ * @return True if ACL is defined
+ */
+ public boolean isObjectCannedACLDefined() {
+ return null != objectCannedACL;
+ }
+
+ /**
+ * Gets the AWS ACL to apply to the Amazon S3 putObject request.
+ * @return Amazon S3 object ACL
+ */
+ public ObjectCannedACL getObjectCannedACL() {
+ return objectCannedACL;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java
new file mode 100644
index 0000000..43590b4
--- /dev/null
+++ b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java
@@ -0,0 +1,78 @@
+package software.amazon.payloadoffloading;
+
+import java.util.concurrent.CompletableFuture;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+/**
+ * An AWS storage service that supports saving high payload sizes.
+ */
+public interface PayloadStoreAsync {
+
+ /**
+ * Stores payload in a store that has higher payload size limit than that is supported by original payload store.
+ *
+ * This call is asynchronous, and so documented return values and exceptions are propagated through
+ * the returned {@link CompletableFuture}.
+ *
+ * @param payload
+ * @return future value of a pointer that must be used to retrieve the original payload later.
+ * @throws SdkClientException If any internal errors are encountered on the client side while
+ * attempting to make the request or handle the response. For example
+ * if a network connection is not available.
+ * @throws S3Exception If an error response is returned by actual PayloadStore indicating
+ * either a problem with the data in the request, or a server side issue.
+ */
+ CompletableFuture storeOriginalPayload(String payload);
+
+ /**
+ * Stores payload in a store that has higher payload size limit than that is supported by original payload store.
+ *
+ * This call is asynchronous, and so documented return values and exceptions are propagated through
+ * the returned {@link CompletableFuture}.
+ *
+ * @param payload
+ * @param s3Key
+ * @return future value of a pointer that must be used to retrieve the original payload later.
+ * @throws SdkClientException If any internal errors are encountered on the client side while
+ * attempting to make the request or handle the response. For example
+ * if a network connection is not available.
+ * @throws S3Exception If an error response is returned by actual PayloadStore indicating
+ * either a problem with the data in the request, or a server side issue.
+ */
+ CompletableFuture storeOriginalPayload(String payload, String s3Key);
+
+ /**
+ * Retrieves the original payload using the given payloadPointer. The pointer must
+ * have been obtained using {@link #storeOriginalPayload(String)}
+ *
+ * This call is asynchronous, and so documented return values and exceptions are propagated through
+ * the returned {@link CompletableFuture}.
+ *
+ * @param payloadPointer
+ * @return future value of the original payload
+ * @throws SdkClientException If any internal errors are encountered on the client side while
+ * attempting to make the request or handle the response. For example
+ * if payloadPointer is invalid or a network connection is not available.
+ * @throws S3Exception If an error response is returned by actual PayloadStore indicating
+ * a server side issue.
+ */
+ CompletableFuture getOriginalPayload(String payloadPointer);
+
+ /**
+ * Deletes the original payload using the given payloadPointer. The pointer must
+ * have been obtained using {@link #storeOriginalPayload(String)}
+ *
+ * This call is asynchronous, and so documented return values and exceptions are propagated through
+ * the returned {@link CompletableFuture}.
+ *
+ * @param payloadPointer
+ * @return future value that completes when the delete operation finishes
+ * @throws SdkClientException If any internal errors are encountered on the client side while
+ * attempting to make the request or handle the response to/from PayloadStore.
+ * For example, if payloadPointer is invalid or a network connection is not available.
+ * @throws S3Exception If an error response is returned by actual PayloadStore indicating
+ * a server side issue.
+ */
+ CompletableFuture deleteOriginalPayload(String payloadPointer);
+}
diff --git a/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
new file mode 100644
index 0000000..a0dc868
--- /dev/null
+++ b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java
@@ -0,0 +1,118 @@
+package software.amazon.payloadoffloading;
+
+import java.io.UncheckedIOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+
+/**
+ * Dao layer to access S3.
+ */
+public class S3AsyncDao {
+ private static final Logger LOG = LoggerFactory.getLogger(S3AsyncDao.class);
+ private final S3AsyncClient s3Client;
+ private final ServerSideEncryptionStrategy serverSideEncryptionStrategy;
+ private final ObjectCannedACL objectCannedACL;
+
+ public S3AsyncDao(S3AsyncClient s3Client) {
+ this(s3Client, null, null);
+ }
+
+ public S3AsyncDao(
+ S3AsyncClient s3Client,
+ ServerSideEncryptionStrategy serverSideEncryptionStrategy,
+ ObjectCannedACL objectCannedACL) {
+ this.s3Client = s3Client;
+ this.serverSideEncryptionStrategy = serverSideEncryptionStrategy;
+ this.objectCannedACL = objectCannedACL;
+ }
+
+ public CompletableFuture getTextFromS3(String s3BucketName, String s3Key) {
+ GetObjectRequest getObjectRequest = GetObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(s3Key)
+ .build();
+
+ return s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toBytes())
+ .thenApply(ResponseBytes::asUtf8String)
+ .handle((v, tIn) -> {
+ if (tIn != null) {
+ Throwable t = Util.unwrapFutureException(tIn);
+ if (t instanceof SdkException) {
+ String errorMessage = "Failed to get the S3 object which contains the payload.";
+ LOG.error(errorMessage, t);
+ throw SdkException.create(errorMessage, t);
+ }
+ if (t instanceof UncheckedIOException) {
+ String errorMessage = "Failure when handling the message which was read from S3 object.";
+ LOG.error(errorMessage, t);
+ throw SdkClientException.create(errorMessage, t);
+ }
+ throw new CompletionException(t);
+ }
+ return v;
+ });
+ }
+
+ public CompletableFuture storeTextInS3(String s3BucketName, String s3Key, String payloadContentStr) {
+ PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(s3Key);
+
+ if (objectCannedACL != null) {
+ putObjectRequestBuilder.acl(objectCannedACL);
+ }
+
+ // https://docs.aws.amazon.com/AmazonS3/latest/dev/kms-using-sdks.html
+ if (serverSideEncryptionStrategy != null) {
+ serverSideEncryptionStrategy.decorate(putObjectRequestBuilder);
+ }
+
+ return s3Client.putObject(putObjectRequestBuilder.build(), AsyncRequestBody.fromString(payloadContentStr))
+ .handle((v, tIn) -> {
+ if (tIn != null) {
+ Throwable t = Util.unwrapFutureException(tIn);
+ if (t instanceof SdkException) {
+ String errorMessage = "Failed to store the message content in an S3 object.";
+ LOG.error(errorMessage, t);
+ throw SdkException.create(errorMessage, t);
+ }
+ throw new CompletionException(t);
+ }
+ return null;
+ });
+ }
+
+ public CompletableFuture deletePayloadFromS3(String s3BucketName, String s3Key) {
+ DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder()
+ .bucket(s3BucketName)
+ .key(s3Key)
+ .build();
+ return s3Client.deleteObject(deleteObjectRequest)
+ .handle((v, tIn) -> {
+ if (tIn != null) {
+ Throwable t = Util.unwrapFutureException(tIn);
+ if (t instanceof SdkException) {
+ String errorMessage = "Failed to delete the S3 object which contains the payload";
+ LOG.error(errorMessage, t);
+ throw SdkException.create(errorMessage, t);
+ }
+ throw new CompletionException(t);
+ }
+
+ LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
+ return null;
+ });
+ }
+}
diff --git a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java
new file mode 100644
index 0000000..5b84ede
--- /dev/null
+++ b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java
@@ -0,0 +1,77 @@
+package software.amazon.payloadoffloading;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.payloadoffloading.PayloadS3Pointer;
+
+/**
+ * S3 based implementation for PayloadStoreAsync.
+ */
+public class S3BackedPayloadStoreAsync implements PayloadStoreAsync {
+ private static final Logger LOG = LoggerFactory.getLogger(S3BackedPayloadStoreAsync.class);
+
+ private final String s3BucketName;
+ private final S3AsyncDao s3Dao;
+
+ public S3BackedPayloadStoreAsync(S3AsyncDao s3Dao, String s3BucketName) {
+ this.s3BucketName = s3BucketName;
+ this.s3Dao = s3Dao;
+ }
+
+ @Override
+ public CompletableFuture storeOriginalPayload(String payload) {
+ String s3Key = UUID.randomUUID().toString();
+ return storeOriginalPayload(payload, s3Key);
+ }
+
+ @Override
+ public CompletableFuture storeOriginalPayload(String payload, String s3Key) {
+ return s3Dao.storeTextInS3(s3BucketName, s3Key, payload)
+ .thenApply(v -> {
+ LOG.info("S3 object created, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
+
+ // Convert S3 pointer (bucket name, key, etc) to JSON string
+ PayloadS3Pointer s3Pointer = new PayloadS3Pointer(s3BucketName, s3Key);
+
+ return s3Pointer.toJson();
+ });
+ }
+
+ @Override
+ public CompletableFuture getOriginalPayload(String payloadPointer) {
+ try {
+ PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);
+
+ String s3BucketName = s3Pointer.getS3BucketName();
+ String s3Key = s3Pointer.getS3Key();
+
+ return s3Dao.getTextFromS3(s3BucketName, s3Key)
+ .thenApply(originalPayload -> {
+ LOG.info("S3 object read, Bucket name: " + s3BucketName + ", Object key: " + s3Key + ".");
+ return originalPayload;
+ });
+ } catch (Exception e) {
+ CompletableFuture futureEx = new CompletableFuture<>();
+ futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
+ return futureEx;
+ }
+ }
+
+ @Override
+ public CompletableFuture deleteOriginalPayload(String payloadPointer) {
+ try {
+ PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer);
+
+ String s3BucketName = s3Pointer.getS3BucketName();
+ String s3Key = s3Pointer.getS3Key();
+ return s3Dao.deletePayloadFromS3(s3BucketName, s3Key);
+ } catch (Exception e) {
+ CompletableFuture futureEx = new CompletableFuture<>();
+ futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e));
+ return futureEx;
+ }
+ }
+}
diff --git a/src/main/java/software/amazon/payloadoffloading/Util.java b/src/main/java/software/amazon/payloadoffloading/Util.java
index bd3932f..28e3966 100644
--- a/src/main/java/software/amazon/payloadoffloading/Util.java
+++ b/src/main/java/software/amazon/payloadoffloading/Util.java
@@ -1,5 +1,6 @@
package software.amazon.payloadoffloading;
+import java.util.concurrent.CompletionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
@@ -34,4 +35,12 @@ public static long getStringSizeInBytes(String str) {
public static String getUserAgentHeader(String clientName) {
return clientName + "/" + VersionInfo.SDK_VERSION;
}
+
+ public static Throwable unwrapFutureException(Throwable t) {
+ if ((t instanceof CompletionException) && t.getCause() != null) {
+ t = t.getCause();
+ }
+ return t;
+ }
+
}
diff --git a/src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java b/src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java
new file mode 100644
index 0000000..4cc6984
--- /dev/null
+++ b/src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java
@@ -0,0 +1,102 @@
+package software.amazon.payloadoffloading;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+
+/**
+ * Tests the PayloadStorageAsyncConfiguration class.
+ */
+public class PayloadStorageAsyncConfigurationTest {
+
+ private static final String s3BucketName = "test-bucket-name";
+ private static final ServerSideEncryptionStrategy SERVER_SIDE_ENCRYPTION_STRATEGY = ServerSideEncryptionFactory.awsManagedCmk();
+ private final ObjectCannedACL objectCannelACL = ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL;
+
+ @Test
+ public void testCopyConstructor() {
+ S3AsyncClient s3Async = mock(S3AsyncClient.class);
+
+ boolean alwaysThroughS3 = true;
+ int payloadSizeThreshold = 500;
+
+ PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration();
+
+ payloadStorageConfiguration.withPayloadSupportEnabled(s3Async, s3BucketName)
+ .withAlwaysThroughS3(alwaysThroughS3)
+ .withPayloadSizeThreshold(payloadSizeThreshold)
+ .withServerSideEncryption(SERVER_SIDE_ENCRYPTION_STRATEGY)
+ .withObjectCannedACL(objectCannelACL);
+
+ PayloadStorageAsyncConfiguration newPayloadStorageConfiguration = new PayloadStorageAsyncConfiguration(payloadStorageConfiguration);
+
+ assertEquals(s3Async, newPayloadStorageConfiguration.getS3AsyncClient());
+ assertEquals(s3BucketName, newPayloadStorageConfiguration.getS3BucketName());
+ assertEquals(SERVER_SIDE_ENCRYPTION_STRATEGY, newPayloadStorageConfiguration.getServerSideEncryptionStrategy());
+ assertTrue(newPayloadStorageConfiguration.isPayloadSupportEnabled());
+ assertEquals(objectCannelACL, newPayloadStorageConfiguration.getObjectCannedACL());
+ assertEquals(alwaysThroughS3, newPayloadStorageConfiguration.isAlwaysThroughS3());
+ assertEquals(payloadSizeThreshold, newPayloadStorageConfiguration.getPayloadSizeThreshold());
+ assertNotSame(newPayloadStorageConfiguration, payloadStorageConfiguration);
+ }
+
+ @Test
+ public void testPayloadSupportEnabled() {
+ S3AsyncClient s3Async = mock(S3AsyncClient.class);
+ PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration();
+ payloadStorageConfiguration.setPayloadSupportEnabled(s3Async, s3BucketName);
+
+ assertTrue(payloadStorageConfiguration.isPayloadSupportEnabled());
+ assertNotNull(payloadStorageConfiguration.getS3AsyncClient());
+ assertEquals(s3BucketName, payloadStorageConfiguration.getS3BucketName());
+ }
+
+ @Test
+ public void testDisablePayloadSupport() {
+ PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration();
+ payloadStorageConfiguration.setPayloadSupportDisabled();
+
+ assertNull(payloadStorageConfiguration.getS3AsyncClient());
+ assertNull(payloadStorageConfiguration.getS3BucketName());
+ }
+
+ @Test
+ public void testAlwaysThroughS3() {
+ PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration();
+
+ payloadStorageConfiguration.setAlwaysThroughS3(true);
+ assertTrue(payloadStorageConfiguration.isAlwaysThroughS3());
+
+ payloadStorageConfiguration.setAlwaysThroughS3(false);
+ assertFalse(payloadStorageConfiguration.isAlwaysThroughS3());
+ }
+
+ @Test
+ public void testSseAwsKeyManagementParams() {
+ PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration();
+
+ assertNull(payloadStorageConfiguration.getServerSideEncryptionStrategy());
+
+ payloadStorageConfiguration.setServerSideEncryptionStrategy(SERVER_SIDE_ENCRYPTION_STRATEGY);
+ assertEquals(SERVER_SIDE_ENCRYPTION_STRATEGY, payloadStorageConfiguration.getServerSideEncryptionStrategy());
+ }
+
+ @Test
+ public void testCannedAccessControlList() {
+ PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration();
+
+ assertFalse(payloadStorageConfiguration.isObjectCannedACLDefined());
+
+ payloadStorageConfiguration.withObjectCannedACL(objectCannelACL);
+ assertTrue(payloadStorageConfiguration.isObjectCannedACLDefined());
+ assertEquals(objectCannelACL, payloadStorageConfiguration.getObjectCannedACL());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java b/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java
index e9b473a..2c05d47 100644
--- a/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java
+++ b/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java
@@ -1,12 +1,17 @@
package software.amazon.payloadoffloading;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
import org.junit.Test;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
-import static org.mockito.Mockito.mock;
-import static org.junit.Assert.*;
-
/**
* Tests the PayloadStorageConfiguration class.
*/
diff --git a/src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java b/src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java
new file mode 100644
index 0000000..4f926e4
--- /dev/null
+++ b/src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java
@@ -0,0 +1,117 @@
+package software.amazon.payloadoffloading;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import junitparams.JUnitParamsRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+
+@RunWith(JUnitParamsRunner.class)
+public class S3AsyncDaoTest {
+
+ private static String s3ServerSideEncryptionKMSKeyId = "test-customer-managed-kms-key-id";
+ private static final String S3_BUCKET_NAME = "test-bucket-name";
+ private static final String ANY_PAYLOAD = "AnyPayload";
+ private static final String ANY_S3_KEY = "AnyS3key";
+ private ServerSideEncryptionStrategy serverSideEncryptionStrategy = ServerSideEncryptionFactory.awsManagedCmk();
+ private ObjectCannedACL objectCannedACL = ObjectCannedACL.PUBLIC_READ;
+ private S3AsyncClient s3AsyncClient;
+ private S3AsyncDao dao;
+
+ @Before
+ public void setup() {
+ s3AsyncClient = mock(S3AsyncClient.class);
+ }
+
+ @Test
+ public void storeTextInS3WithoutSSEOrCannedTest() {
+ dao = new S3AsyncDao(s3AsyncClient);
+ when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ ArgumentCaptor argument = ArgumentCaptor.forClass(PutObjectRequest.class);
+
+ dao.storeTextInS3(S3_BUCKET_NAME, ANY_S3_KEY, ANY_PAYLOAD).join();
+
+ verify(s3AsyncClient, times(1)).putObject(argument.capture(), any(AsyncRequestBody.class));
+
+ assertNull(argument.getValue().serverSideEncryption());
+ assertNull(argument.getValue().acl());
+ assertEquals(S3_BUCKET_NAME, argument.getValue().bucket());
+ }
+
+ @Test
+ public void storeTextInS3WithSSETest() {
+ dao = new S3AsyncDao(s3AsyncClient, serverSideEncryptionStrategy, null);
+ when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ ArgumentCaptor argument = ArgumentCaptor.forClass(PutObjectRequest.class);
+
+ dao.storeTextInS3(S3_BUCKET_NAME, ANY_S3_KEY, ANY_PAYLOAD).join();
+
+ verify(s3AsyncClient, times(1)).putObject(argument.capture(), any(AsyncRequestBody.class));
+
+ assertEquals(ServerSideEncryption.AWS_KMS, argument.getValue().serverSideEncryption());
+ assertNull(argument.getValue().acl());
+ assertEquals(S3_BUCKET_NAME, argument.getValue().bucket());
+ }
+
+ @Test
+ public void storeTextInS3WithBothTest() {
+ dao = new S3AsyncDao(s3AsyncClient, serverSideEncryptionStrategy, objectCannedACL);
+ when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ ArgumentCaptor argument = ArgumentCaptor.forClass(PutObjectRequest.class);
+
+ dao.storeTextInS3(S3_BUCKET_NAME, ANY_S3_KEY, ANY_PAYLOAD).join();
+
+ verify(s3AsyncClient, times(1)).putObject(argument.capture(), any(AsyncRequestBody.class));
+
+ assertEquals(ServerSideEncryption.AWS_KMS, argument.getValue().serverSideEncryption());
+ assertEquals(objectCannedACL, argument.getValue().acl());
+ assertEquals(S3_BUCKET_NAME, argument.getValue().bucket());
+ }
+
+ @Test
+ public void getTextTest() {
+ dao = new S3AsyncDao(s3AsyncClient);
+ when(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))).thenReturn(
+ CompletableFuture.completedFuture(ResponseBytes.fromByteArray(
+ GetObjectRequest.builder().build(), ANY_PAYLOAD.getBytes(StandardCharsets.UTF_8))));
+
+ String payload = dao.getTextFromS3(S3_BUCKET_NAME, ANY_S3_KEY).join();
+
+ verify(s3AsyncClient, times(1)).getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class));
+
+ assertEquals(payload, ANY_PAYLOAD);
+ }
+
+ @Test
+ public void deleteTextTest() {
+ dao = new S3AsyncDao(s3AsyncClient);
+ when(s3AsyncClient.deleteObject(any(DeleteObjectRequest.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+
+ dao.deletePayloadFromS3(S3_BUCKET_NAME, ANY_S3_KEY).join();
+
+ verify(s3AsyncClient, times(1)).deleteObject(any(DeleteObjectRequest.class));
+ }
+}
diff --git a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java
new file mode 100644
index 0000000..a0a31f2
--- /dev/null
+++ b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java
@@ -0,0 +1,176 @@
+package software.amazon.payloadoffloading;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import junitparams.JUnitParamsRunner;
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
+
+@RunWith(JUnitParamsRunner.class)
+public class S3BackedPayloadStoreAsyncTest {
+ private static final String S3_BUCKET_NAME = "test-bucket-name";
+ private static final String ANY_PAYLOAD = "AnyPayload";
+ private static final String ANY_S3_KEY = "AnyS3key";
+ private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string";
+ private PayloadStoreAsync payloadStore;
+ private S3AsyncDao s3AsyncDao;
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Before
+ public void setup() {
+ s3AsyncDao = mock(S3AsyncDao.class);
+ payloadStore = new S3BackedPayloadStoreAsync(s3AsyncDao, S3_BUCKET_NAME);
+ }
+
+ @Test
+ public void testStoreOriginalPayloadOnSuccess() {
+ when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ String actualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD).join();
+
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor sseArgsCaptor = ArgumentCaptor.forClass(ServerSideEncryptionStrategy.class);
+ ArgumentCaptor cannedArgsCaptor = ArgumentCaptor.forClass(ObjectCannedACL.class);
+
+ verify(s3AsyncDao, times(1)).storeTextInS3(eq(S3_BUCKET_NAME), keyCaptor.capture(),
+ eq(ANY_PAYLOAD));
+
+ PayloadS3Pointer expectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, keyCaptor.getValue());
+ assertEquals(expectedPayloadPointer.toJson(), actualPayloadPointer);
+ }
+
+ @Test
+ public void testStoreOriginalPayloadWithS3KeyOnSuccess() {
+ when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ String actualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD, ANY_S3_KEY).join();
+
+ verify(s3AsyncDao, times(1)).storeTextInS3(eq(S3_BUCKET_NAME), eq(ANY_S3_KEY),
+ eq(ANY_PAYLOAD));
+
+ PayloadS3Pointer expectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY);
+ assertEquals(expectedPayloadPointer.toJson(), actualPayloadPointer);
+ }
+
+ @Test
+ public void testStoreOriginalPayloadDoesAlwaysCreateNewObjects() {
+ //Store any payload
+ when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ String anyActualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD).join();
+
+ //Store any other payload and validate that the pointers are different
+ String anyOtherActualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD).join();
+
+ ArgumentCaptor anyOtherKeyCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor sseArgsCaptor = ArgumentCaptor.forClass(ServerSideEncryptionStrategy.class);
+ ArgumentCaptor cannedArgsCaptor = ArgumentCaptor.forClass(ObjectCannedACL.class);
+
+ verify(s3AsyncDao, times(2)).storeTextInS3(eq(S3_BUCKET_NAME), anyOtherKeyCaptor.capture(),
+ eq(ANY_PAYLOAD));
+
+ String anyS3Key = anyOtherKeyCaptor.getAllValues().get(0);
+ String anyOtherS3Key = anyOtherKeyCaptor.getAllValues().get(1);
+
+ PayloadS3Pointer anyExpectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, anyS3Key);
+ assertEquals(anyExpectedPayloadPointer.toJson(), anyActualPayloadPointer);
+
+ PayloadS3Pointer anyOtherExpectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, anyOtherS3Key);
+ assertEquals(anyOtherExpectedPayloadPointer.toJson(), anyOtherActualPayloadPointer);
+
+ assertThat(anyS3Key, Matchers.not(anyOtherS3Key));
+ assertThat(anyActualPayloadPointer, Matchers.not(anyOtherActualPayloadPointer));
+ }
+
+ @Test
+ public void testStoreOriginalPayloadOnS3Failure() {
+ CompletableFuture sdkEx = new CompletableFuture<>();
+ sdkEx.completeExceptionally(SdkException.create("S3 Exception", new Throwable()));
+ when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn(sdkEx);
+
+ exception.expect(CompletionException.class);
+ exception.expectMessage("S3 Exception");
+ //Any S3 Dao exception is thrown back as-is to clients
+ payloadStore.storeOriginalPayload(ANY_PAYLOAD).join();
+ }
+
+ @Test
+ public void testGetOriginalPayloadOnSuccess() {
+ PayloadS3Pointer anyPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY);
+ when(s3AsyncDao.getTextFromS3(any(String.class), any(String.class))).thenReturn(
+ CompletableFuture.completedFuture(ANY_PAYLOAD));
+ String actualPayload = payloadStore.getOriginalPayload(anyPointer.toJson()).join();
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class);
+ verify(s3AsyncDao, times(1)).getTextFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(ANY_S3_KEY, keyCaptor.getValue());
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
+ assertEquals(ANY_PAYLOAD, actualPayload);
+ }
+
+ @Test
+ public void testGetOriginalPayloadIncorrectPointer() {
+ exception.expect(CompletionException.class);
+ exception.expectMessage(INCORRECT_POINTER_EXCEPTION_MSG);
+ //Any S3 Dao exception is thrown back as-is to clients
+ payloadStore.getOriginalPayload("IncorrectPointer").join();
+ verifyNoInteractions(s3AsyncDao);
+ }
+
+ @Test
+ public void testGetOriginalPayloadOnS3Failure() {
+ CompletableFuture sdkEx = new CompletableFuture<>();
+ sdkEx.completeExceptionally(SdkException.create("S3 Exception", new Throwable()));
+ when(s3AsyncDao.getTextFromS3(any(String.class), any(String.class))).thenReturn(sdkEx);
+ exception.expect(CompletionException.class);
+ exception.expectMessage("S3 Exception");
+ //Any S3 Dao exception is thrown back as-is to clients
+ PayloadS3Pointer anyPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY);
+ payloadStore.getOriginalPayload(anyPointer.toJson()).join();
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadOnSuccess() {
+ when(s3AsyncDao.deletePayloadFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
+ PayloadS3Pointer anyPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY);
+ payloadStore.deleteOriginalPayload(anyPointer.toJson()).join();
+
+ ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class);
+ verify(s3AsyncDao, times(1)).deletePayloadFromS3(bucketNameCaptor.capture(), keyCaptor.capture());
+
+ assertEquals(ANY_S3_KEY, keyCaptor.getValue());
+ assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue());
+ }
+
+ @Test
+ public void testDeleteOriginalPayloadIncorrectPointer() {
+ exception.expect(CompletionException.class);
+ exception.expectMessage(INCORRECT_POINTER_EXCEPTION_MSG);
+ payloadStore.deleteOriginalPayload("IncorrectPointer").join();
+ verifyNoInteractions(s3AsyncDao);
+ }
+}