From f7c12afdccb3e2f3cae567e941aeece298482917 Mon Sep 17 00:00:00 2001 From: Michael Sambol Date: Thu, 28 Mar 2024 11:03:45 -0600 Subject: [PATCH] integration test with timestamp --- .../aws-pipes-sources-alpha/lib/kinesis.ts | 17 ++++++++++---- .../aws-cdk-pipes-sources-kinesis.assets.json | 4 ++-- ...ws-cdk-pipes-sources-kinesis.template.json | 3 ++- ...efaultTestDeployAssert6F0A38D7.assets.json | 4 ++-- ...aultTestDeployAssert6F0A38D7.template.json | 10 ++++---- .../integ.kinesis.js.snapshot/manifest.json | 8 +++---- .../test/integ.kinesis.js.snapshot/tree.json | 17 +++++++------- .../test/integ.kinesis.ts | 4 ++-- .../test/kinesis.test.ts | 23 +++++++++++++++---- 9 files changed, 57 insertions(+), 33 deletions(-) diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts b/packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts index d5d56141aafd8..3739811bf457c 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts @@ -87,14 +87,14 @@ export interface KinesisSourceParameters { /** * With StartingPosition set to AT_TIMESTAMP, the time from which to start reading, in Unix time seconds. - * + * * @example - * 1711576897 + * '2025-01-01T00:00:00Z' * * @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-startingpositiontimestamp * @default - no starting position timestamp */ - readonly startingPositionTimestamp?: number; + readonly startingPositionTimestamp?: string; } /** @@ -112,6 +112,8 @@ export class KinesisSource implements ISource { private parallelizationFactor; private deadLetterTarget; private deadLetterTargetArn; + private startingPosition; + private startingPositionTimestamp; constructor(stream: IStream, parameters: KinesisSourceParameters) { this.stream = stream; @@ -124,6 +126,8 @@ export class KinesisSource implements ISource { this.maximumRetryAttempts = this.sourceParameters.maximumRetryAttempts; this.parallelizationFactor = this.sourceParameters.parallelizationFactor; this.deadLetterTarget = this.sourceParameters.deadLetterTarget; + this.startingPosition = this.sourceParameters.startingPosition; + this.sourceParameters = this.sourceParameters.startingPositionTimestamp; if (this.batchSize !== undefined) { if (this.batchSize < 1 || this.batchSize > 10000) { @@ -151,6 +155,9 @@ export class KinesisSource implements ISource { throw new Error(`Parallelization factor must be between 1 and 10, received ${this.parallelizationFactor}`); } } + if (this.startingPositionTimestamp && this.startingPosition !== KinesisStartingPosition.AT_TIMESTAMP) { + throw new Error(`Timestamp only valid with StartingPosition AT_TIMESTAMP for Kinesis streams, received ${this.startingPosition}`); + } if (this.deadLetterTarget instanceof Queue) { this.deadLetterTargetArn = this.deadLetterTarget.queueArn; @@ -170,8 +177,8 @@ export class KinesisSource implements ISource { maximumRetryAttempts: this.maximumRetryAttempts, onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure, parallelizationFactor: this.sourceParameters.parallelizationFactor, - startingPosition: this.sourceParameters.startingPosition, - startingPositionTimestamp: this.sourceParameters.startingPositionTimestamp?.toString(), + startingPosition: this.startingPosition, + startingPositionTimestamp: this.startingPositionTimestamp, }, }, }; diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.assets.json b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.assets.json index f1201187ac0a1..2730f50f31e91 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.assets.json +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.assets.json @@ -1,7 +1,7 @@ { "version": "36.0.0", "files": { - "078cf6c772b6ffef5992c48a9736208e36d83d5571f404f8288d8faacf8cf931": { + "311f4115b21cfb634db3c457167b6ede4cba3f0e27de88f7a6b6fa8351eacbf1": { "source": { "path": "aws-cdk-pipes-sources-kinesis.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "078cf6c772b6ffef5992c48a9736208e36d83d5571f404f8288d8faacf8cf931.json", + "objectKey": "311f4115b21cfb634db3c457167b6ede4cba3f0e27de88f7a6b6fa8351eacbf1.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.template.json b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.template.json index 183f242821e26..b650d9a9ce948 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.template.json +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/aws-cdk-pipes-sources-kinesis.template.json @@ -113,7 +113,8 @@ "MaximumRetryAttempts": 3, "OnPartialBatchItemFailure": "AUTOMATIC_BISECT", "ParallelizationFactor": 1, - "StartingPosition": "LATEST" + "StartingPosition": "AT_TIMESTAMP", + "StartingPositionTimestamp": "2024-01-01T00:00:00Z" } }, "Target": { diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.assets.json b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.assets.json index c7439fb24a6ed..8b669a520e028 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.assets.json +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.assets.json @@ -14,7 +14,7 @@ } } }, - "c26a17eb508f2d7d5729a4754ea8421bb41649efa8047971308ee8b55ee154f0": { + "0692af55ff3c68f43ce634f592f755cb7d234b1eecd0fbe4fecd39a2efb4787b": { "source": { "path": "integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.template.json", "packaging": "file" @@ -22,7 +22,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "c26a17eb508f2d7d5729a4754ea8421bb41649efa8047971308ee8b55ee154f0.json", + "objectKey": "0692af55ff3c68f43ce634f592f755cb7d234b1eecd0fbe4fecd39a2efb4787b.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.template.json b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.template.json index 5cd62e1d23a95..dad35cda4b74c 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.template.json +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.template.json @@ -1,6 +1,6 @@ { "Resources": { - "AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9": { + "AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146": { "Type": "Custom::DeployAssert@SdkCallKinesisputRecord", "Properties": { "ServiceToken": { @@ -24,11 +24,11 @@ ] ] }, - "Data": "\"e43854c5-bf46-47e1-b72d-915687576f61\"", + "Data": "\"a8060ba0-8895-4a74-ae05-129d1ba812b3\"", "PartitionKey": "\"1\"" }, "flattenResponse": "false", - "salt": "1711512298885" + "salt": "1711644399817" }, "UpdateReplacePolicy": "Delete", "DeletionPolicy": "Delete" @@ -123,7 +123,7 @@ }, "service": "SQS", "api": "receiveMessage", - "expected": "{\"$StringLike\":\"ZTQzODU0YzUtYmY0Ni00N2UxLWI3MmQtOTE1Njg3NTc2ZjYx\"}", + "expected": "{\"$StringLike\":\"YTgwNjBiYTAtODg5NS00YTc0LWFlMDUtMTI5ZDFiYTgxMmIz\"}", "actualPath": "Messages.0.Body.data", "stateMachineArn": { "Ref": "AwsApiCallSQSreceiveMessage23b37826db4ba4656cf9088d0244b67bWaitFor296E8BCE" @@ -146,7 +146,7 @@ "outputPaths": [ "Messages.0.Body.data" ], - "salt": "1711512298885" + "salt": "1711644399817" }, "UpdateReplacePolicy": "Delete", "DeletionPolicy": "Delete" diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/manifest.json b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/manifest.json index 66429476146c5..3d4714242849c 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/manifest.json +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/manifest.json @@ -18,7 +18,7 @@ "validateOnSynth": false, "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/078cf6c772b6ffef5992c48a9736208e36d83d5571f404f8288d8faacf8cf931.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/311f4115b21cfb634db3c457167b6ede4cba3f0e27de88f7a6b6fa8351eacbf1.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -114,7 +114,7 @@ "validateOnSynth": false, "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/c26a17eb508f2d7d5729a4754ea8421bb41649efa8047971308ee8b55ee154f0.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/0692af55ff3c68f43ce634f592f755cb7d234b1eecd0fbe4fecd39a2efb4787b.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -131,10 +131,10 @@ "integtestpipesourcekinesisDefaultTestDeployAssert6F0A38D7.assets" ], "metadata": { - "/integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9/Default/Default": [ + "/integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146/Default/Default": [ { "type": "aws:cdk:logicalId", - "data": "AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9" + "data": "AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146" } ], "/integtest-pipe-source-kinesis/DefaultTest/DeployAssert/SingletonFunction1488541a7b23466481b69b4408076b81/Role": [ diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/tree.json b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/tree.json index c9f43b8ea64cb..1de1d703aa05c 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/tree.json +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.js.snapshot/tree.json @@ -215,7 +215,8 @@ "maximumRetryAttempts": 3, "onPartialBatchItemFailure": "AUTOMATIC_BISECT", "parallelizationFactor": 1, - "startingPosition": "LATEST" + "startingPosition": "AT_TIMESTAMP", + "startingPositionTimestamp": "2024-01-01T00:00:00Z" } }, "target": { @@ -306,17 +307,17 @@ "id": "DeployAssert", "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert", "children": { - "AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9": { - "id": "AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9", - "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9", + "AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146": { + "id": "AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146", + "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146", "children": { "SdkProvider": { "id": "SdkProvider", - "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9/SdkProvider", + "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146/SdkProvider", "children": { "AssertionsProvider": { "id": "AssertionsProvider", - "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9/SdkProvider/AssertionsProvider", + "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146/SdkProvider/AssertionsProvider", "constructInfo": { "fqn": "constructs.Construct", "version": "10.3.0" @@ -330,11 +331,11 @@ }, "Default": { "id": "Default", - "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9/Default", + "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146/Default", "children": { "Default": { "id": "Default", - "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecordac62b7caf42ac13ed9fe2cdebc241de9/Default/Default", + "path": "integtest-pipe-source-kinesis/DefaultTest/DeployAssert/AwsApiCallKinesisputRecord79fbaf9038737f10cd80d35a619fb146/Default/Default", "constructInfo": { "fqn": "constructs.Construct", "version": "10.3.0" diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.ts b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.ts index 49ae43c50cfcb..6a1f90674c6b1 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.ts +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/integ.kinesis.ts @@ -35,8 +35,8 @@ const sourceUnderTest = new KinesisSource(sourceKinesisStream, { maximumRetryAttempts: 3, onPartialBatchItemFailure: OnPartialBatchItemFailure.AUTOMATIC_BISECT, parallelizationFactor: 1, - startingPosition: KinesisStartingPosition.LATEST, - startingPositionTimestamp: 1711576897, + startingPosition: KinesisStartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: '2024-01-01T00:00:00Z', }); new Pipe(stack, 'Pipe', { diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/test/kinesis.test.ts b/packages/@aws-cdk/aws-pipes-sources-alpha/test/kinesis.test.ts index 1120a2eb69002..22e08db472a4c 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/test/kinesis.test.ts +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/test/kinesis.test.ts @@ -56,7 +56,7 @@ describe('kinesis source', () => { onPartialBatchItemFailure: OnPartialBatchItemFailure.AUTOMATIC_BISECT, parallelizationFactor: 10, startingPosition: KinesisStartingPosition.LATEST, - startingPositionTimestamp: 1711576897, + startingPositionTimestamp: '2024-01-01T00:00:00Z', }); new Pipe(stack, 'MyPipe', { @@ -87,7 +87,7 @@ describe('kinesis source', () => { OnPartialBatchItemFailure: 'AUTOMATIC_BISECT', ParallelizationFactor: 10, StartingPosition: 'LATEST', - StartingPositionTimestamp: '1711576897', + StartingPositionTimestamp: '2024-01-01T00:00:00Z', }, }, }); @@ -108,7 +108,7 @@ describe('kinesis source', () => { onPartialBatchItemFailure: OnPartialBatchItemFailure.AUTOMATIC_BISECT, parallelizationFactor: 10, startingPosition: KinesisStartingPosition.LATEST, - startingPositionTimestamp: 1711576897, + startingPositionTimestamp: '2024-01-01T00:00:00Z', }); new Pipe(stack, 'MyPipe', { @@ -144,7 +144,7 @@ describe('kinesis source', () => { OnPartialBatchItemFailure: 'AUTOMATIC_BISECT', ParallelizationFactor: 10, StartingPosition: 'LATEST', - StartingPositionTimestamp: '1711576897', + StartingPositionTimestamp: '2024-01-01T00:00:00Z', }, }, }); @@ -356,4 +356,19 @@ describe('kinesis source parameters validation', () => { }); }).toThrow('Parallelization factor must be between 1 and 10, received 11'); }); + + test('timestamp provided and starting position not set to AT_TIMESTAMP should throw', () => { + // GIVEN + const app = new App(); + const stack = new Stack(app, 'demo-stack'); + const stream = new Stream(stack, 'MyStream', {}); + + // WHEN + expect(() => { + new KinesisSource(stream, { + startingPosition: KinesisStartingPosition.LATEST, + startingPositionTimestamp: '2024-01-01T00:00:00Z', + }); + }).toThrow('Timestamp only valid with StartingPosition AT_TIMESTAMP for Kinesis streams, received KinesisStartingPosition.LATEST'); + }); });