From 9e4c4e57022c1865d783fab9a732e34f504cbe0c Mon Sep 17 00:00:00 2001 From: Vandita2020 Date: Fri, 8 Nov 2024 14:54:44 -0800 Subject: [PATCH 1/4] Support for Provisioned Pollers --- ...efaultTestDeployAssertAF78BD0F.assets.json | 2 +- .../cdk.out | 2 +- .../integ.json | 2 +- ...vent-source-kafka-self-managed.assets.json | 6 +- ...nt-source-kafka-self-managed.template.json | 132 +++++ .../manifest.json | 30 +- .../tree.json | 212 +++++++- .../test/integ.kafka-selfmanaged.ts | 23 + .../aws-lambda-event-sources/README.md | 23 + .../aws-lambda-event-sources/lib/kafka.ts | 3 + .../aws-lambda-event-sources/lib/stream.ts | 41 ++ .../test/kafka.test.ts | 188 +++++++ .../aws-lambda/lib/event-source-mapping.ts | 43 ++ .../test/event-source-mapping.test.ts | 73 ++- packages/aws-cdk-lib/awslint.json | 12 + ...aws-lambda-eventsourcemapping-pollers.json | 506 ++++++++++++++++++ 16 files changed, 1287 insertions(+), 11 deletions(-) create mode 100644 tools/@aws-cdk/spec2cdk/temporary-schemas/us-east-1/aws-lambda-eventsourcemapping-pollers.json diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json index a01ffb4d5f4c8..18daf0934754d 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.assets.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "files": { "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22": { "source": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out index 1f0068d32659a..c6e612584e352 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/cdk.out @@ -1 +1 @@ -{"version":"36.0.0"} \ No newline at end of file +{"version":"38.0.1"} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json index eb53722c5afaf..b2142e0e738f8 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/integ.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "testCases": { "LambdaEventSourceKafkaSelfManagedTest/DefaultTest": { "stacks": [ diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json index cb4ec6e990114..983f0b88ef58f 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.assets.json @@ -1,7 +1,7 @@ { - "version": "36.0.0", + "version": "38.0.1", "files": { - "4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f": { + "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1": { "source": { "path": "lambda-event-source-kafka-self-managed.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f.json", + "objectKey": "cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json index dd921a80f1344..5a5f9daa01865 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/lambda-event-source-kafka-self-managed.template.json @@ -322,6 +322,138 @@ "my-test-topic2" ] } + }, + "F3ServiceRole2F65FFC0": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "F3ServiceRoleDefaultPolicy1C0463D1": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "F3ServiceRoleDefaultPolicy1C0463D1", + "Roles": [ + { + "Ref": "F3ServiceRole2F65FFC0" + } + ] + } + }, + "F38FF9B13A": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "F3ServiceRole2F65FFC0", + "Arn" + ] + }, + "Runtime": "nodejs18.x" + }, + "DependsOn": [ + "F3ServiceRoleDefaultPolicy1C0463D1", + "F3ServiceRole2F65FFC0" + ] + }, + "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "BatchSize": 100, + "FilterCriteria": { + "Filters": [ + { + "Pattern": "{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + } + ] + }, + "FunctionName": { + "Ref": "F38FF9B13A" + }, + "ProvisionedPollerConfig": { + "MaximumPollers": 3, + "MinimumPollers": 1 + }, + "SelfManagedEventSource": { + "Endpoints": { + "KafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "SelfManagedKafkaEventSourceConfig": { + "ConsumerGroupId": "myTestConsumerGroup3" + }, + "SourceAccessConfigurations": [ + { + "Type": "CLIENT_CERTIFICATE_TLS_AUTH", + "URI": { + "Ref": "SC0855C491" + } + }, + { + "Type": "SERVER_ROOT_CA_CERTIFICATE", + "URI": { + "Ref": "S509448A1" + } + } + ], + "StartingPosition": "TRIM_HORIZON", + "Topics": [ + "my-test-topic3" + ] + } } }, "Parameters": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json index 06655a65cd8cb..d41cc1cc8586e 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/manifest.json @@ -1,5 +1,5 @@ { - "version": "36.0.0", + "version": "38.0.1", "artifacts": { "lambda-event-source-kafka-self-managed.assets": { "type": "cdk:asset-manifest", @@ -16,9 +16,10 @@ "templateFile": "lambda-event-source-kafka-self-managed.template.json", "terminationProtection": false, "validateOnSynth": false, + "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/4bf07b5cad381e52a796b0a42748934cce430e155ffe31f0366eef200d40356f.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/cb2d7d48eaf634edd0acc6e6475e904f57e491fce9fa3ac839e93cba823616c1.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -100,6 +101,30 @@ "data": "F2KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic20A678189" } ], + "/lambda-event-source-kafka-self-managed/F3/ServiceRole/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3ServiceRole2F65FFC0" + } + ], + "/lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3ServiceRoleDefaultPolicy1C0463D1" + } + ], + "/lambda-event-source-kafka-self-managed/F3/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F38FF9B13A" + } + ], + "/lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "F3KafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic3ED015F25" + } + ], "/lambda-event-source-kafka-self-managed/BootstrapVersion": [ { "type": "aws:cdk:logicalId", @@ -130,6 +155,7 @@ "templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json", "terminationProtection": false, "validateOnSynth": false, + "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json index 54543d8610b3a..168d1caad8aee 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.js.snapshot/tree.json @@ -531,6 +531,214 @@ "version": "0.0.0" } }, + "F3": { + "id": "F3", + "path": "lambda-event-source-kafka-self-managed/F3", + "children": { + "ServiceRole": { + "id": "ServiceRole", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole", + "children": { + "ImportServiceRole": { + "id": "ImportServiceRole", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/ImportServiceRole", + "constructInfo": { + "fqn": "aws-cdk-lib.Resource", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Role", + "aws:cdk:cloudformation:props": { + "assumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "managedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnRole", + "version": "0.0.0" + } + }, + "DefaultPolicy": { + "id": "DefaultPolicy", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/ServiceRole/DefaultPolicy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Policy", + "aws:cdk:cloudformation:props": { + "policyDocument": { + "Statement": [ + { + "Action": [ + "secretsmanager:DescribeSecret", + "secretsmanager:GetSecretValue" + ], + "Effect": "Allow", + "Resource": [ + { + "Ref": "S509448A1" + }, + { + "Ref": "SC0855C491" + } + ] + } + ], + "Version": "2012-10-17" + }, + "policyName": "F3ServiceRoleDefaultPolicy1C0463D1", + "roles": [ + { + "Ref": "F3ServiceRole2F65FFC0" + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnPolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Policy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Role", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::Function", + "aws:cdk:cloudformation:props": { + "code": { + "zipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}" + }, + "handler": "index.handler", + "role": { + "Fn::GetAtt": [ + "F3ServiceRole2F65FFC0", + "Arn" + ] + }, + "runtime": "nodejs18.x" + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnFunction", + "version": "0.0.0" + } + }, + "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3": { + "id": "KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3", + "path": "lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3", + "children": { + "Resource": { + "id": "Resource", + "path": "lambda-event-source-kafka-self-managed/F3/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic3/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Lambda::EventSourceMapping", + "aws:cdk:cloudformation:props": { + "batchSize": 100, + "filterCriteria": { + "filters": [ + { + "pattern": "{\"numericEquals\":[{\"numeric\":[\"=\",1]}]}" + } + ] + }, + "functionName": { + "Ref": "F38FF9B13A" + }, + "provisionedPollerConfig": { + "minimumPollers": 1, + "maximumPollers": 3 + }, + "selfManagedEventSource": { + "endpoints": { + "kafkaBootstrapServers": [ + "my-self-hosted-kafka-broker-1:9092", + "my-self-hosted-kafka-broker-2:9092", + "my-self-hosted-kafka-broker-3:9092" + ] + } + }, + "selfManagedKafkaEventSourceConfig": { + "consumerGroupId": "myTestConsumerGroup3" + }, + "sourceAccessConfigurations": [ + { + "type": "CLIENT_CERTIFICATE_TLS_AUTH", + "uri": { + "Ref": "SC0855C491" + } + }, + { + "type": "SERVER_ROOT_CA_CERTIFICATE", + "uri": { + "Ref": "S509448A1" + } + } + ], + "startingPosition": "TRIM_HORIZON", + "topics": [ + "my-test-topic3" + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.CfnEventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.EventSourceMapping", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_lambda.Function", + "version": "0.0.0" + } + }, "BootstrapVersion": { "id": "BootstrapVersion", "path": "lambda-event-source-kafka-self-managed/BootstrapVersion", @@ -566,7 +774,7 @@ "path": "LambdaEventSourceKafkaSelfManagedTest/DefaultTest/Default", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.3.0" + "version": "10.4.2" } }, "DeployAssert": { @@ -612,7 +820,7 @@ "path": "Tree", "constructInfo": { "fqn": "constructs.Construct", - "version": "10.3.0" + "version": "10.4.2" } } }, diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts index 3af619c6f8bc2..a9a8b266a2029 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-lambda-event-sources/test/integ.kafka-selfmanaged.ts @@ -87,6 +87,29 @@ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== ], filterEncryption: myKey, })); + + const fn3 = new TestFunction(this, 'F3'); + rootCASecret.grantRead(fn3); + clientCertificatesSecret.grantRead(fn3); + + fn3.addEventSource(new SelfManagedKafkaEventSource({ + bootstrapServers, + topic: 'my-test-topic3', + consumerGroupId: 'myTestConsumerGroup3', + secret: clientCertificatesSecret, + authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH, + rootCACertificate: rootCASecret, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + filters: [ + lambda.FilterCriteria.filter({ + numericEquals: lambda.FilterRule.isEqual(1), + }), + ], + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + })); } } diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md index 57aa2b3fe3e5c..1d4f12937a812 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/README.md +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/README.md @@ -396,6 +396,29 @@ myFunction.addEventSource(new ManagedKafkaEventSource({ })); ``` +Set configuration for provisioned pollers that read from the event source. + +```ts +import { ManagedKafkaEventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; + +// Your MSK cluster arn +declare const clusterArn: string + +// The Kafka topic you want to subscribe to +const topic = 'some-cool-topic'; + +declare const myFunction: lambda.Function; +myFunction.addEventSource(new ManagedKafkaEventSource({ + clusterArn, + topic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, +})); +``` + ## Roadmap Eventually, this module will support all the event sources described under diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts index 3c75a45a51447..c7a837b23ff16 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/kafka.ts @@ -164,6 +164,7 @@ export class ManagedKafkaEventSource extends StreamEventSource { kafkaConsumerGroupId: this.innerProps.consumerGroupId, onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, + provisionedPollerConfig: this.innerProps.provisionedPollerConfig, }), ); @@ -240,6 +241,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { throw new Error('secret must be set if Kafka brokers accessed over Internet'); } this.innerProps = props; + } public bind(target: lambda.IFunction) { @@ -256,6 +258,7 @@ export class SelfManagedKafkaEventSource extends StreamEventSource { sourceAccessConfigurations: this.sourceAccessConfigurations(), onFailure: this.innerProps.onFailure, supportS3OnFailureDestination: true, + provisionedPollerConfig: this.innerProps.provisionedPollerConfig, }), ); diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts index 414de7472f0e5..5ef96d9d6a06b 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts @@ -44,6 +44,29 @@ export interface BaseStreamEventSourceProps{ * @default true */ readonly enabled?: boolean; + + /** + * Configuration for provisioned pollers that read from the event source. + * When specified, allows control over the minimum and maximum number of pollers + * that can be provisioned to process events from the source. + * + */ + readonly provisionedPollerConfig?: ProvisionedPollerConfig; +} + +export interface ProvisionedPollerConfig { + /** + * The minimum number of pollers that should be provisioned. + * + * @default - 1 + */ + readonly minimumPollers: number; + /** + * The maximum number of pollers that can be provisioned. + * + * @default - 200 + */ + readonly maximumPollers: number; } /** @@ -144,6 +167,24 @@ export interface StreamEventSourceProps extends BaseStreamEventSourceProps { */ export abstract class StreamEventSource implements lambda.IEventSource { protected constructor(protected readonly props: StreamEventSourceProps) { + if (props.provisionedPollerConfig) { + const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; + if (minimumPollers != undefined) { + if (minimumPollers < 1 || minimumPollers > 200) { + throw new Error('Minimum provisioned pollers must be between 1 and 200 inclusive'); + } + } + if (maximumPollers != undefined) { + if (maximumPollers < 1 || maximumPollers > 2000) { + throw new Error('Maximum provisioned pollers must be between 1 and 2000 inclusive'); + } + } + if (minimumPollers != undefined && maximumPollers != undefined) { + if (minimumPollers > maximumPollers) { + throw new Error('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers'); + } + } + } } public abstract bind(_target: lambda.IFunction): void; diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts index d5b1df2c9a657..5775a8ad1507f 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/test/kafka.test.ts @@ -308,6 +308,105 @@ describe('KafkaEventSource', () => { }); }); + test('with provisioned pollers', () => { + // GIVEN + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + // WHEN + testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + })); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + DestinationConfig: { + OnFailure: { + Destination: { + 'Fn::Join': ['', ['arn:', { Ref: 'AWS::Partition' }, ':s3:::my-bucket']], + }, + }, + }, + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('maximum provisioned poller is out of limit', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + }))).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('minimum provisioned poller is out of limit', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 0, + maximumPollers: 3, + }, + }))).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + const stack = new cdk.Stack(); + const testLambdaFunction = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + const bucket = Bucket.fromBucketName(stack, 'BucketByName', 'my-bucket'); + const s3OnFailureDestination = new sources.S3OnFailureDestination(bucket); + + expect(() => testLambdaFunction.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: s3OnFailureDestination, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 1, + }, + }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); describe('self-managed kafka', () => { @@ -998,5 +1097,94 @@ describe('KafkaEventSource', () => { expect(mskEventMapping.eventSourceMappingId).toBeDefined(); expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); }); + + test('with provisioned pollers', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + const mskEventMapping = new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + }); + + // WHEN + fn.addEventSource(mskEventMapping); + expect(mskEventMapping.eventSourceMappingId).toBeDefined(); + expect(mskEventMapping.eventSourceMappingArn).toBeDefined(); + + const template = Template.fromStack(stack); + template.hasResourceProperties('AWS::Lambda::EventSourceMapping', { + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('maximum provisioned poller is out of limit', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + }))).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('minimum provisioned poller is out of limit', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 0, + maximumPollers: 3, + }, + }))).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const clusterArn = 'some-arn'; + const kafkaTopic = 'some-topic'; + + expect(() => fn.addEventSource(new sources.ManagedKafkaEventSource( + { + clusterArn, + topic: kafkaTopic, + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 1, + }, + }))).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); }); diff --git a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts index a441ce148788a..9ce7a428964aa 100644 --- a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts +++ b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts @@ -81,6 +81,21 @@ export interface SourceAccessConfiguration { readonly uri: string; } +export interface ProvisionedPollerConfig { + /** + * The minimum number of pollers that should be provisioned. + * + * @default - 1 + */ + readonly minimumPollers?: number; + /** + * The maximum number of pollers that can be provisioned. + * + * @default - 200 + */ + readonly maximumPollers?: number; +} + export interface EventSourceMappingOptions { /** * The Amazon Resource Name (ARN) of the event source. Any record added to @@ -269,6 +284,14 @@ export interface EventSourceMappingOptions { * @default false */ readonly supportS3OnFailureDestination?: boolean; + + /** + * Configuration for provisioned pollers that read from the event source. + * When specified, allows control over the minimum and maximum number of pollers + * that can be provisioned to process events from the source. + * + */ + readonly provisionedPollerConfig?: ProvisionedPollerConfig; } /** @@ -356,6 +379,25 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp throw new Error('eventSourceArn and kafkaBootstrapServers are mutually exclusive'); } + if (props.provisionedPollerConfig) { + const { minimumPollers, maximumPollers } = props.provisionedPollerConfig; + if (minimumPollers != undefined) { + if (minimumPollers < 1 || minimumPollers > 200) { + throw new Error('Minimum provisioned pollers must be between 1 and 200 inclusive'); + } + } + if (maximumPollers != undefined) { + if (maximumPollers < 1 || maximumPollers > 2000) { + throw new Error('Maximum provisioned pollers must be between 1 and 2000 inclusive'); + } + } + if (minimumPollers != undefined && maximumPollers != undefined) { + if (minimumPollers > maximumPollers) { + throw new Error('Minimum provisioned pollers must be less than or equal to maximum provisioned pollers'); + } + } + } + if (props.kafkaBootstrapServers && (props.kafkaBootstrapServers?.length < 1)) { throw new Error('kafkaBootStrapServers must not be empty if set'); } @@ -455,6 +497,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp kmsKeyArn: props.filterEncryption?.keyArn, selfManagedKafkaEventSourceConfig: props.kafkaBootstrapServers ? consumerGroupConfig : undefined, amazonManagedKafkaEventSourceConfig: props.eventSourceArn ? consumerGroupConfig : undefined, + provisionedPollerConfig: props.provisionedPollerConfig, }); this.eventSourceMappingId = cfnEventSourceMapping.ref; this.eventSourceMappingArn = EventSourceMapping.formatArn(this, this.eventSourceMappingId); diff --git a/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts b/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts index 6eafc9267471d..d9f607af5e8c2 100644 --- a/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts +++ b/packages/aws-cdk-lib/aws-lambda/test/event-source-mapping.test.ts @@ -2,7 +2,7 @@ import { Match, Template } from '../../assertions'; import { Key } from '../../aws-kms'; import * as cdk from '../../core'; import * as lambda from '../lib'; -import { Code, EventSourceMapping, Function, Runtime, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib'; +import { Code, EventSourceMapping, Function, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib'; let stack: cdk.Stack; let fn: Function; @@ -492,4 +492,75 @@ describe('event source mapping', () => { startingPositionTimestamp: 1640995200, })).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/); }); + + test('provisioned pollers is set', () => { + new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 3, + }, + }); + Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', { + StartingPosition: 'AT_TIMESTAMP', + StartingPositionTimestamp: 1640995200, + ProvisionedPollerConfig: { + MinimumPollers: 1, + MaximumPollers: 3, + }, + }); + }); + + test('minimum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 0, + }, + })).toThrow(/Minimum provisioned pollers must be between 1 and 200 inclusive/); + }); + + test('maximum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + maximumPollers: 2001, + }, + })).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('only maximum provisioned poller is out of limit', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 1, + maximumPollers: 2001, + }, + })).toThrow(/Maximum provisioned pollers must be between 1 and 2000 inclusive/); + }); + + test('Minimum provisioned poller greater than maximum provisioned poller', () => { + expect(() => new EventSourceMapping(stack, 'test', { + target: fn, + eventSourceArn: '', + startingPosition: StartingPosition.AT_TIMESTAMP, + startingPositionTimestamp: 1640995200, + provisionedPollerConfig: { + minimumPollers: 3, + maximumPollers: 2, + }, + })).toThrow(/Minimum provisioned pollers must be less than or equal to maximum provisioned pollers/); + }); }); diff --git a/packages/aws-cdk-lib/awslint.json b/packages/aws-cdk-lib/awslint.json index ffc97a8a2609d..63de88a75eea6 100644 --- a/packages/aws-cdk-lib/awslint.json +++ b/packages/aws-cdk-lib/awslint.json @@ -384,6 +384,18 @@ "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.S3EventSource.bucket", "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.SnsEventSource.topic", "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.SqsEventSource.queue", + "docs-public-apis:aws-cdk-lib.aws_lambda.EventSourceMappingOptions.provisionedPollerConfig", + "props-default-doc:aws-cdk-lib.aws_lambda.EventSourceMappingOptions.provisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.maximumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.minimumPollers", + "props-default-doc:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.maximumPollers", + "props-default-doc:aws-cdk-lib.aws_lambda.ProvisionedPollerConfig.minimumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.KafkaEventSourceProps.provisionedPollerConfig", + "props-default-doc:aws-cdk-lib.aws_lambda_event_sources.KafkaEventSourceProps.provisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig.maximumPollers", + "docs-public-apis:aws-cdk-lib.aws_lambda_event_sources.ProvisionedPollerConfig.minimumPollers", "docs-public-apis:aws-cdk-lib.aws_logs.CrossAccountDestination.addToPolicy", "docs-public-apis:aws-cdk-lib.aws_logs.DataIdentifier.*", "docs-public-apis:aws-cdk-lib.aws_logs.JsonPattern.jsonPatternString", diff --git a/tools/@aws-cdk/spec2cdk/temporary-schemas/us-east-1/aws-lambda-eventsourcemapping-pollers.json b/tools/@aws-cdk/spec2cdk/temporary-schemas/us-east-1/aws-lambda-eventsourcemapping-pollers.json new file mode 100644 index 0000000000000..819eab3c47241 --- /dev/null +++ b/tools/@aws-cdk/spec2cdk/temporary-schemas/us-east-1/aws-lambda-eventsourcemapping-pollers.json @@ -0,0 +1,506 @@ +{ + "typeName" : "AWS::Lambda::EventSourceMapping", + "description" : "Resource Type definition for AWS::Lambda::EventSourceMapping", + "additionalProperties" : false, + "properties" : { + "Id": { + "description": "Event Source Mapping Identifier UUID.", + "type": "string", + "pattern": "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}", + "minLength": 36, + "maxLength": 36 + }, + "BatchSize": { + "description": "The maximum number of items to retrieve in a single batch.", + "type": "integer", + "minimum": 1, + "maximum": 10000 + }, + "BisectBatchOnFunctionError": { + "description": "(Streams) If the function returns an error, split the batch in two and retry.", + "type": "boolean" + }, + "DestinationConfig": { + "description": "(Kinesis, DynamoDB, Amazon MSK, and self-managed Kafka event sources only) A configuration object that specifies the destination of an event after Lambda processes it.", + "$ref": "#/definitions/DestinationConfig" + }, + "Enabled": { + "description": "Disables the event source mapping to pause polling and invocation.", + "type": "boolean" + }, + "EventSourceArn": { + "description": "The Amazon Resource Name (ARN) of the event source.", + "type": "string", + "pattern": "arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}(-gov)?(-iso([a-z])?)?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)", + "minLength": 12, + "maxLength": 1024 + }, + "EventSourceMappingArn": { + "description": "The Amazon Resource Name (ARN) of the event source mapping resource.", + "type": "string", + "pattern": "arn:(aws[a-zA-Z-]*)?:lambda:[a-z]{2}((-gov)|(-iso([a-z]?)))?-[a-z]+-\\d{1}:\\d{12}:event-source-mapping:[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", + "minLength": 85, + "maxLength": 120 + }, + "FilterCriteria": { + "description": "The filter criteria to control event filtering.", + "$ref": "#/definitions/FilterCriteria" + }, + "KmsKeyArn": { + "description": "The Amazon Resource Name (ARN) of the KMS key.", + "type": "string", + "pattern": "(arn:(aws[a-zA-Z-]*)?:[a-z0-9-.]+:.*)|()", + "minLength": 12, + "maxLength": 2048 + }, + "FunctionName": { + "description": "The name of the Lambda function.", + "type": "string", + "pattern": "(arn:(aws[a-zA-Z-]*)?:lambda:)?([a-z]{2}(-gov)?(-iso([a-z])?)?-[a-z]+-\\d{1}:)?(\\d{12}:)?(function:)?([a-zA-Z0-9-_]+)(:(\\$LATEST|[a-zA-Z0-9-_]+))?", + "minLength": 1, + "maxLength": 140 + }, + "MaximumBatchingWindowInSeconds": { + "description": "(Streams) The maximum amount of time to gather records before invoking the function, in seconds.", + "type": "integer", + "minimum": 0, + "maximum": 300 + }, + "MaximumRecordAgeInSeconds": { + "description": "(Streams) The maximum age of a record that Lambda sends to a function for processing.", + "type": "integer", + "minimum": -1, + "maximum": 604800 + }, + "MaximumRetryAttempts": { + "description": "(Streams) The maximum number of times to retry when the function returns an error.", + "type": "integer", + "minimum": -1, + "maximum": 10000 + }, + "ParallelizationFactor": { + "description": "(Streams) The number of batches to process from each shard concurrently.", + "type": "integer", + "minimum": 1, + "maximum": 10 + }, + "StartingPosition": { + "description": "The position in a stream from which to start reading. Required for Amazon Kinesis and Amazon DynamoDB Streams sources.", + "type": "string", + "pattern": "(LATEST|TRIM_HORIZON|AT_TIMESTAMP)+", + "minLength": 6, + "maxLength": 12 + }, + "StartingPositionTimestamp": { + "description": "With StartingPosition set to AT_TIMESTAMP, the time from which to start reading, in Unix time seconds.", + "type": "number" + }, + "Tags" : { + "description": "A list of tags to apply to event source mapping resource", + "type": "array", + "uniqueItems": true, + "insertionOrder": false, + "items": { + "$ref": "#/definitions/Tag" + } + }, + "Topics": { + "description": "(Kafka) A list of Kafka topics.", + "type": "array", + "uniqueItems": true, + "items": { + "type": "string", + "pattern": "^[^.]([a-zA-Z0-9\\-_.]+)", + "minLength": 1, + "maxLength": 249 + }, + "minItems": 1, + "maxItems": 1 + }, + "Queues": { + "description": "(ActiveMQ) A list of ActiveMQ queues.", + "type": "array", + "uniqueItems": true, + "items": { + "type": "string", + "pattern": "[\\s\\S]*", + "minLength": 1, + "maxLength": 1000 + }, + "minItems": 1, + "maxItems": 1 + }, + "SourceAccessConfigurations": { + "description": "A list of SourceAccessConfiguration.", + "type": "array", + "uniqueItems": true, + "items": { + "$ref": "#/definitions/SourceAccessConfiguration" + }, + "minItems": 1, + "maxItems": 22 + }, + "TumblingWindowInSeconds": { + "description": "(Streams) Tumbling window (non-overlapping time window) duration to perform aggregations.", + "type": "integer", + "minimum": 0, + "maximum": 900 + }, + "FunctionResponseTypes": { + "description": "(Streams) A list of response types supported by the function.", + "type": "array", + "uniqueItems": true, + "items": { + "type": "string", + "enum": [ + "ReportBatchItemFailures" + ] + }, + "minLength": 0, + "maxLength": 1 + }, + "SelfManagedEventSource": { + "description": "Self-managed event source endpoints.", + "$ref": "#/definitions/SelfManagedEventSource" + }, + "AmazonManagedKafkaEventSourceConfig": { + "description": "Specific configuration settings for an MSK event source.", + "$ref": "#/definitions/AmazonManagedKafkaEventSourceConfig" + }, + "SelfManagedKafkaEventSourceConfig": { + "description": "Specific configuration settings for a Self-Managed Apache Kafka event source.", + "$ref": "#/definitions/SelfManagedKafkaEventSourceConfig" + }, + "ScalingConfig": { + "description": "The scaling configuration for the event source.", + "$ref": "#/definitions/ScalingConfig" + }, + "DocumentDBEventSourceConfig": { + "description": "Document db event source config.", + "$ref": "#/definitions/DocumentDBEventSourceConfig" + }, + "ProvisionedPollerConfig": { + "description": "Event processor config.", + "$ref": "#/definitions/ProvisionedPollerConfig" + }, + "MetricsConfig": { + "description": "Metrics config for enhanced observability.", + "$ref": "#/definitions/MetricsConfig" + } + }, + "definitions" : { + "DestinationConfig" : { + "type" : "object", + "additionalProperties" : false, + "description": "A configuration object that specifies the destination of an event after Lambda processes it.", + "properties" : { + "OnFailure": { + "description": "A destination for records of invocations that failed processing.", + "$ref": "#/definitions/OnFailure" + } + } + }, + "FilterCriteria": { + "type": "object", + "description": "The filter criteria to control event filtering.", + "additionalProperties" : false, + "properties": { + "Filters": { + "description": "List of filters of this FilterCriteria", + "type": "array", + "uniqueItems": true, + "items": { + "$ref": "#/definitions/Filter" + }, + "minItems": 1, + "maxItems": 20 + } + } + }, + "Filter": { + "type": "object", + "description": "The filter object that defines parameters for ESM filtering.", + "additionalProperties" : false, + "properties": { + "Pattern": { + "type": "string", + "description": "The filter pattern that defines which events should be passed for invocations.", + "pattern": ".*", + "minLength": 0, + "maxLength": 4096 + } + } + }, + "OnFailure": { + "type" : "object", + "description" : "A destination for records of invocations that failed processing.", + "additionalProperties" : false, + "properties" : { + "Destination": { + "description": "The Amazon Resource Name (ARN) of the destination resource.", + "type": "string", + "pattern": "arn:(aws[a-zA-Z0-9-]*):([a-zA-Z0-9\\-])+:([a-z]{2}(-gov)?(-iso([a-z])?)?-[a-z]+-\\d{1})?:(\\d{12})?:(.*)", + "minLength": 12, + "maxLength": 1024 + } + } + }, + "SourceAccessConfiguration" : { + "type" : "object", + "additionalProperties" : false, + "description": "The configuration used by AWS Lambda to access event source", + "properties" : { + "Type" : { + "description": "The type of source access configuration.", + "enum": [ + "BASIC_AUTH", + "VPC_SUBNET", + "VPC_SECURITY_GROUP", + "SASL_SCRAM_512_AUTH", + "SASL_SCRAM_256_AUTH", + "VIRTUAL_HOST", + "CLIENT_CERTIFICATE_TLS_AUTH", + "SERVER_ROOT_CA_CERTIFICATE" + ], + "type": "string" + }, + "URI" : { + "description": "The URI for the source access configuration resource.", + "type": "string", + "pattern": "[a-zA-Z0-9-\\/*:_+=.@-]*", + "minLength": 1, + "maxLength": 200 + } + } + }, + "SelfManagedEventSource" : { + "type": "object", + "additionalProperties": false, + "description": "The configuration used by AWS Lambda to access a self-managed event source.", + "properties": { + "Endpoints": { + "description": "The endpoints for a self-managed event source.", + "$ref": "#/definitions/Endpoints" + } + } + }, + "Endpoints" : { + "type": "object", + "additionalProperties": false, + "description": "The endpoints used by AWS Lambda to access a self-managed event source.", + "properties": { + "KafkaBootstrapServers": { + "type": "array", + "description": "A list of Kafka server endpoints.", + "uniqueItems": true, + "items": { + "type": "string", + "description": "The URL of a Kafka server.", + "pattern": "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9]):[0-9]{1,5}", + "minLength": 1, + "maxLength": 300 + }, + "minItems": 1, + "maxItems": 10 + } + } + }, + "ConsumerGroupId": { + "description": "The identifier for the Kafka Consumer Group to join.", + "type": "string", + "pattern": "[a-zA-Z0-9-\\/*:_+=.@-]*", + "minLength": 1, + "maxLength": 200 + }, + "AmazonManagedKafkaEventSourceConfig": { + "description": "Specific configuration settings for an MSK event source.", + "type": "object", + "additionalProperties": false, + "properties": { + "ConsumerGroupId": { + "description": "The identifier for the Kafka Consumer Group to join.", + "$ref": "#/definitions/ConsumerGroupId" + } + } + }, + "SelfManagedKafkaEventSourceConfig": { + "description": "Specific configuration settings for a Self-Managed Apache Kafka event source.", + "type": "object", + "additionalProperties": false, + "properties": { + "ConsumerGroupId": { + "description": "The identifier for the Kafka Consumer Group to join.", + "$ref": "#/definitions/ConsumerGroupId" + } + } + }, + "MaximumConcurrency": { + "description": "The maximum number of concurrent functions that an event source can invoke.", + "type": "integer", + "minimum": 2, + "maximum": 1000 + }, + "ScalingConfig": { + "description": "The scaling configuration for the event source.", + "type": "object", + "additionalProperties": false, + "properties": { + "MaximumConcurrency": { + "description": "The maximum number of concurrent functions that the event source can invoke.", + "$ref": "#/definitions/MaximumConcurrency" + } + } + }, + "Tag" : { + "type" : "object", + "additionalProperties": false, + "properties" : { + "Key" : { + "type" : "string", + "description" : "The key name of the tag. You can specify a value that is 1 to 128 Unicode characters in length and cannot be prefixed with aws:. You can use any of the following characters: the set of Unicode letters, digits, whitespace, _, ., /, =, +, and -.", + "minLength" : 1, + "maxLength" : 128 + }, + "Value" : { + "type" : "string", + "description" : "The value for the tag. You can specify a value that is 0 to 256 Unicode characters in length and cannot be prefixed with aws:. You can use any of the following characters: the set of Unicode letters, digits, whitespace, _, ., /, =, +, and -.", + "minLength" : 0, + "maxLength" : 256 + } + }, + "required" : [ + "Key" + ] + }, + "DocumentDBEventSourceConfig": { + "description": "Document db event source config.", + "type": "object", + "additionalProperties": false, + "properties": { + "DatabaseName": { + "description": "The database name to connect to.", + "type": "string", + "minLength": 1, + "maxLength": 63 + }, + "CollectionName": { + "description": "The collection name to connect to.", + "type": "string", + "minLength": 1, + "maxLength": 57 + }, + "FullDocument": { + "description": "Include full document in change stream response. The default option will only send the changes made to documents to Lambda. If you want the complete document sent to Lambda, set this to UpdateLookup.", + "type": "string", + "enum": [ + "UpdateLookup", + "Default" + ] + } + } + }, + "ProvisionedPollerConfig": { + "description": "ProvisionedPollerConfig.", + "type": "object", + "additionalProperties": false, + "properties": { + "MinimumPollers": { + "description": "Minimum poller count.", + "type": "integer", + "minimum": 1, + "maximum": 3000 + }, + "MaximumPollers": { + "description": "Maximum poller count.", + "type": "integer", + "minimum": 1, + "maximum": 3000 + } + } + }, + "MetricsConfig": { + "description": "Metrics config for enhanced observability.", + "type": "object", + "additionalProperties": false, + "properties": { + "Metrics": { + "description": "Metric groups to enable.", + "type": "array", + "uniqueItems": true, + "items": { + "type": "string", + "enum": [ + "EventCount" + ] + }, + "minItems": 0, + "maxItems": 1 + } + } + } + }, + "required" : [ "FunctionName" ], + "createOnlyProperties" : [ + "/properties/EventSourceArn", + "/properties/StartingPosition", + "/properties/StartingPositionTimestamp", + "/properties/SelfManagedEventSource", + "/properties/AmazonManagedKafkaEventSourceConfig", + "/properties/SelfManagedKafkaEventSourceConfig" + ], + "readOnlyProperties" : [ "/properties/Id", "/properties/EventSourceMappingArn"], + "primaryIdentifier" : [ "/properties/Id" ], + "propertyTransform" : { + "/properties/StartingPositionTimestamp": "StartingPositionTimestamp * 1000" + }, + "handlers": { + "create": { + "permissions": [ + "lambda:CreateEventSourceMapping", + "lambda:GetEventSourceMapping", + "lambda:TagResource", + "kms:DescribeKey", + "kms:GenerateDataKey", + "kms:Decrypt" + ] + }, + "delete": { + "permissions": [ + "lambda:DeleteEventSourceMapping", + "lambda:GetEventSourceMapping", + "kms:Decrypt" + ] + }, + "list": { + "permissions": [ + "lambda:ListEventSourceMappings" + ] + }, + "read": { + "permissions": [ + "lambda:GetEventSourceMapping", + "lambda:ListTags", + "kms:Decrypt" + ] + }, + "update": { + "permissions": [ + "lambda:UpdateEventSourceMapping", + "lambda:GetEventSourceMapping", + "lambda:ListTags", + "lambda:TagResource", + "lambda:UntagResource", + "kms:DescribeKey", + "kms:GenerateDataKey", + "kms:Decrypt" + ] + } + }, + "tagging": { + "taggable": true, + "tagOnCreate": true, + "tagUpdatable": true, + "cloudFormationSystemTags": true, + "tagProperty": "/properties/Tags" + } +} \ No newline at end of file From e12de23a8f8deb01bfa9f0c2baf5bf239c23a749 Mon Sep 17 00:00:00 2001 From: Roger Zhang Date: Wed, 20 Nov 2024 04:21:43 +0000 Subject: [PATCH 2/4] lint --- packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts | 2 +- packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts index 5ef96d9d6a06b..54ae7edfba20e 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts @@ -49,7 +49,7 @@ export interface BaseStreamEventSourceProps{ * Configuration for provisioned pollers that read from the event source. * When specified, allows control over the minimum and maximum number of pollers * that can be provisioned to process events from the source. - * + * @default - no provisioned pollers */ readonly provisionedPollerConfig?: ProvisionedPollerConfig; } diff --git a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts index 9ce7a428964aa..b5ea21aeb60af 100644 --- a/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts +++ b/packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts @@ -289,7 +289,7 @@ export interface EventSourceMappingOptions { * Configuration for provisioned pollers that read from the event source. * When specified, allows control over the minimum and maximum number of pollers * that can be provisioned to process events from the source. - * + * @default - no provisioned pollers */ readonly provisionedPollerConfig?: ProvisionedPollerConfig; } From 212e22956746e1e0d1da1ae27653fbc55a36ea92 Mon Sep 17 00:00:00 2001 From: GZ Date: Wed, 20 Nov 2024 14:54:18 -0800 Subject: [PATCH 3/4] Update packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts --- packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts index 7ddc24693dd7a..9d74fd1dc6b76 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts @@ -58,7 +58,7 @@ export interface ProvisionedPollerConfig { /** * The minimum number of pollers that should be provisioned. * - * @default - 1 + * @default 1 */ readonly minimumPollers: number; /** From 7c29f6348949d426c9b785285f3943d6d11c6fb5 Mon Sep 17 00:00:00 2001 From: GZ Date: Wed, 20 Nov 2024 14:54:24 -0800 Subject: [PATCH 4/4] Update packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts --- packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts index 9d74fd1dc6b76..f65cb6a9852a1 100644 --- a/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts @@ -64,7 +64,7 @@ export interface ProvisionedPollerConfig { /** * The maximum number of pollers that can be provisioned. * - * @default - 200 + * @default 200 */ readonly maximumPollers: number; }