Skip to content

Commit

Permalink
Add feedback from @nmussy
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Jun 25, 2024
1 parent 1c666cd commit d0df247
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 15 deletions.
19 changes: 6 additions & 13 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,12 @@ export interface KinesisTargetParameters {
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
* @default - none
*/
readonly inputTransformation?: IInputTransformation;

/**
* Determines which shard in the stream the data record is assigned to. Partition keys are
* Unicode strings with a maximum length limit of 256 characters for each key. Amazon Kinesis
* Data Streams uses the partition key as input to a hash function that maps the partition key
* and associated data to a specific shard. Specifically, an MD5 hash function is used to map
* partition keys to 128-bit integer values and to map associated data records to shards. As a
* result of this hashing mechanism, all data records with the same partition key map to the
* same shard within the stream.
*
* Minimum: 0
* Maximum: 256
* Determines which shard in the stream the data record is assigned to.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetkinesisstreamparameters.html#cfn-pipes-pipe-pipetargetkinesisstreamparameters-partitionkey
*/
Expand All @@ -46,9 +37,11 @@ export class KinesisTarget implements ITarget {

validatePartitionKey(parameters.partitionKey);
}

grantPush(grantee: IRole): void {
this.stream.grantWrite(grantee);
}

bind(pipe: IPipe): TargetConfig {
if (!this.streamParameters) {
return {
Expand All @@ -66,7 +59,7 @@ export class KinesisTarget implements ITarget {
}

function validatePartitionKey(pk: string) {
if (pk.length < 0 || pk.length > 256) {
throw new Error(`Partition key must be between 0 and 256 characters, received ${pk.length}`);
if (pk.length > 256) {
throw new Error(`Partition key must be less than or equal to 256 characters, received ${pk.length}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ test.assertions.awsApiCall('SQS', 'sendMessage', {

// It is nontrivial to read from a Kinesis data stream.
// Manual verification was done to ensure the record made
// it from the SQS to Kinesis via the pipe.
// it from SQS to Kinesis via the pipe.

app.synth();
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ describe('Kinesis source parameters validation', () => {
new KinesisTarget(stream, {
partitionKey: 'x'.repeat(257),
});
}).toThrow('Partition key must be between 0 and 256 characters, received 257');
}).toThrow('Partition key must be less than or equal to 256 characters, received 257');
});
});

0 comments on commit d0df247

Please # to comment.