Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(pipes-targets): add lambda function #30271

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 51 additions & 2 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Pipe targets are the end point of a EventBridge Pipe.
The following targets are supported:

1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions-state-machine)
3. `targets.LambdaFunction`: [Send event source to a Lambda Function](#aws-lambda-function)

### Amazon SQS

Expand Down Expand Up @@ -71,7 +72,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### AWS Step Functions State Machine

A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched/filtered) source payload.
A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched) source payload.

```ts
declare const sourceQueue: sqs.Queue;
Expand Down Expand Up @@ -122,3 +123,51 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: pipeTarget
});
```

### AWS Lambda Function

A Lambda Function can be used as a target for a pipe. The Lambda Function will be invoked with the (enriched) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction,{});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The target Lambda Function is invoked synchronously by default. You can also choose to invoke the Lambda Function asynchronously by setting `invocationType` property to `FIRE_AND_FORGET`.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction, {
invocationType: targets.LambdaFunctionInvocationType.FIRE_AND_FORGET,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The input to the target Lambda Function can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './lambda';
export * from './sqs';
export * from './stepfunctions';
79 changes: 79 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/lambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';

/**
* Parameters for the LambdaFunction target
*/
export interface LambdaFunctionParameters {

/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
*/
readonly inputTransformation?: IInputTransformation;

/**
* Specify whether to invoke the Lambda Function synchronously (`REQUEST_RESPONSE`) or asynchronously (`FIRE_AND_FORGET`).
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetlambdafunctionparameters.html
* @default LambdaFunctionInvocationType.REQUEST_RESPONSE
*/
readonly invocationType?: LambdaFunctionInvocationType;
}

/**
* InvocationType for invoking the Lambda Function.
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetlambdafunctionparameters.html
*/
export enum LambdaFunctionInvocationType {
/**
* Invoke Lambda Function asynchronously (`Invoke`). `InvocationType` is set to `Event` on `Invoke`, see https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html for more details.
*/
FIRE_AND_FORGET = 'FIRE_AND_FORGET',

/**
* Invoke Lambda Function synchronously (`Invoke`) and wait for the response. `InvocationType` is set to `RequestResponse` on `Invoke`, see https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html for more details.
*/
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
}

/**
* An EventBridge Pipes target that sends messages to an AWS Lambda Function.
*/
export class LambdaFunction implements ITarget {
public readonly targetArn: string;

private readonly lambdaFunction: lambda.IFunction;
private readonly invocationType: LambdaFunctionInvocationType;
private readonly inputTemplate?: IInputTransformation;

constructor(
lambdaFunction: lambda.IFunction,
parameters: LambdaFunctionParameters,
) {
this.lambdaFunction = lambdaFunction;
this.targetArn = lambdaFunction.functionArn;
this.invocationType =
parameters.invocationType ??
LambdaFunctionInvocationType.REQUEST_RESPONSE;
this.inputTemplate = parameters.inputTransformation;
}

grantPush(grantee: IRole): void {
this.lambdaFunction.grantInvoke(grantee);
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTemplate?.bind(pipe).inputTemplate,
lambdaFunctionParameters: {
invocationType: this.invocationType,
},
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`lambda-function should grant lambda function invoke 1`] = `
{
"MyLambdaPipeRoleEF32F0E5": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import { LambdaClient, TagResourceCommand } from '@aws-sdk/client-lambda';

exports.handler = async (event: any, context: any) => {
const client = new LambdaClient();
const input = {
Resource: context.invokedFunctionArn,
Tags: {
Identifier: event[0].body, // event is received in batches, we just take the first message to update the tag. See https://docs.aws.amazon.com/eventbridge/latest/userguide/pipes-targets-specifics.html
},
};
const command = new TagResourceCommand(input);
await client.send(command);
};

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading