Skip to content
This repository was archived by the owner on Jan 12, 2024. It is now read-only.

Commit

Permalink
Merge pull request #19 from Nike-Inc/athena-log-processor
Browse files Browse the repository at this point in the history
* log processor lambda is no longer trigger by S3 events
* log processor lambda is now triggered by scheduled event (every 5 minutes)
* log processor lambda no longer reads log files
* log processor lambda now queries Athena for ALB events in the last hour
* log processor lambda now rate limit ip based on per hour activities instead of per minute
* log processor lambda now supports customized interval
  • Loading branch information
mayitbeegh authored Apr 4, 2018
2 parents b463d9e + fed3185 commit 83acc52
Show file tree
Hide file tree
Showing 13 changed files with 511 additions and 332 deletions.
6 changes: 3 additions & 3 deletions cerberus-log-processor-lambda/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@
This is a JVM based lambda for processing access log events. It is part of the [edge security](http://engineering.nike.com/cerberus/docs/architecture/infrastructure-overview)
solution for [Cerberus](http://engineering.nike.com/cerberus/).

ALBAccessLogEventHandler::handleNewS3Event(), gets triggered every time the ALB saves a log to S3.
ALBAccessLogEventHandler::handleScheduledEvent(), gets triggered every 5 minutes.
ALBAccessLogEventHandler has a list of processors that can ingest the events and do various things like rate limiting.

To learn more about Cerberus, please see the [Cerberus website](http://engineering.nike.com/cerberus/).

## Processors

### Rate Limiting Processor
This processor will go through access log events and ensures that ips that show up more than the requests per minute limit are added to the auto block list for the Cerberus Env WAF
This processor will query Athena and ensures that ips that show up more than the requests per interval limit are added to the auto block list for the Cerberus Env WAF

### Future Processors
We would like to have a processor for auto blocking ips that spam bad requests.

## Building

To build the fat jar required for Lambda run `./gradlew cerberus-log-processor-lambda:sJ cerberus-log-processor-lambda:deploySam -Penv=[ENVIRONMENT]`
To build and deploy the fat jar required for Lambda run `./gradlew cerberus-log-processor-lambda:sJ cerberus-log-processor-lambda:deploySam -Penv=[ENVIRONMENT]`
58 changes: 45 additions & 13 deletions cerberus-log-processor-lambda/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ Parameters:
RateLimitAutoBlacklistIpSetId:
Type: String
Description: The ID of the Cerberus manual auto-blacklist IP set.
RequestPerMinuteLimit:
RequestPerIntervalLimit:
Type: Number
Description: The number of requests per minute allowed per IP address
Description: The number of requests per interval allowed per IP address
IntervalInMins:
Type: Number
Description: The interval in minutes of requests per interval allowed per IP address
ViolationBlacklistDurationInMins:
Type: Number
Description: The number of minutes to blacklist an IP address for after it has violated the rate limit
Expand All @@ -36,18 +39,38 @@ Parameters:
SlackWebHookUrl:
Type: String
Description: The Slack HTTP endpoint to which the Lambda will post rate limit notifications
AthenaDatabaseName:
Type: String
Description: The name of ALB log's Athena database
AthenaTableName:
Type: String
Description: The name of ALB log's Athena table
AthenaQueryResultBucketName:
Type: String
Description: The name of Athena query result's S3 bucket
AlbLogBucket:
Type: String
Description: The name of ALB log's S3 bucket
AlbLogBucketArn:
Type: String
Description: The ARN of ALB log's S3 bucket
Resources:
LambdaLogProcessorFunction:
Type: AWS::Serverless::Function
Properties:
Description: Function for auto black listing ips that are misbehaving
CodeUri: @@CODE_URI@@
Handler: >-
com.nike.cerberus.lambda.waf.handler.ALBAccessLogEventHandler::handleNewS3Event
com.nike.cerberus.lambda.waf.handler.ALBAccessLogEventHandler::handleScheduledEvent
Runtime: java8
MemorySize: 512
Timeout: 60
Role: !GetAtt LogProcessorLambdaRole.Arn
Events:
Timer:
Type: Schedule
Properties:
Schedule: rate(5 minutes)
Environment:
Variables:
IAM_PRINCIPAL_ARN: !GetAtt LogProcessorLambdaRole.Arn
Expand All @@ -57,19 +80,17 @@ Resources:
MANUAL_BLACKLIST_IP_SET_ID: !Ref ManualBlacklistIpSetId
MANUAL_WHITELIST_IP_SET_ID: !Ref ManualWhitelistIpSetId
RATE_LIMIT_AUTO_BLACKLIST_IP_SET_ID: !Ref RateLimitAutoBlacklistIpSetId
REQUEST_PER_MIN_LIMIT: !Ref RequestPerMinuteLimit
REQUEST_PER_INTERVAL_LIMIT: !Ref
RequestPerIntervalLimit
INTERVAL_IN_MINS: !Ref IntervalInMins
VIOLATION_BLACKLIST_DURATION_IN_MINS: !Ref ViolationBlacklistDurationInMins
SLACK_ICON: !Ref SlackIcon
SLACK_WEB_HOOK_URL: !Ref SlackWebHookUrl
ATHENA_DATABASE_NAME: !Ref AthenaDatabaseName
ATHENA_TABLE_NAME: !Ref AthenaTableName
ATHENA_QUERY_RESULT_BUCKET_NAME: !Ref AthenaQueryResultBucketName
ALB_LOG_BUCKET: !Ref AlbLogBucket

LambdaInvokePermission:
Type: AWS::Lambda::Permission
DependsOn: LambdaLogProcessorFunction
Properties:
Action: lambda:*
FunctionName: !GetAtt 'LambdaLogProcessorFunction.Arn'
Principal: s3.amazonaws.com
SourceAccount: !Ref 'AWS::AccountId'

LogProcessorLambdaRole:
Type: AWS::IAM::Role
Expand All @@ -84,6 +105,8 @@ Resources:
- lambda.amazonaws.com
Version: '2012-10-17'
Path: /
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/AmazonAthenaFullAccess"
Policies:
- PolicyDocument:
Statement:
Expand Down Expand Up @@ -127,4 +150,13 @@ Resources:
- cloudwatch:PutMetricData
Effect: Allow
Resource: '*'
PolicyName: CloudWatchAccess
PolicyName: CloudWatchAccess
- PolicyDocument:
Statement:
- Action:
- s3:*
Effect: Allow
Resource:
- !Ref AlbLogBucketArn
- 'arn:aws:s3:::*/*'
PolicyName: S3BucketAccess
33 changes: 10 additions & 23 deletions cerberus-log-processor-lambda/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version '2.0.0'
version '3.0.0'

import com.amazonaws.auth.policy.Policy
import com.amazonaws.auth.policy.Principal
Expand All @@ -18,10 +18,6 @@ import com.amazonaws.services.cloudformation.AmazonCloudFormationClientBuilder
import com.amazonaws.services.cloudformation.model.DescribeStacksRequest
import com.amazonaws.services.cloudformation.model.Output
import com.amazonaws.services.cloudformation.model.Stack
import com.amazonaws.waiters.FixedDelayStrategy
import com.amazonaws.waiters.MaxAttemptsRetryStrategy
import com.amazonaws.waiters.PollingStrategy
import com.amazonaws.waiters.WaiterParameters

buildscript {
apply from: file(project.getRootProject().getRootDir().getPath() + File.separator + "gradle/buildscript.gradle"), to: buildscript
Expand All @@ -42,6 +38,7 @@ dependencies {
compile group: 'com.fieldju', name: 'slack-client', version: '2.0.0'
compile group: 'com.brsanthu', name: 'google-analytics-java', version: '1.1.2'
compile group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.272'
compile group: 'com.amazonaws', name: 'aws-java-sdk-athena', version: '1.11.301'

testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'org.mockito', name: 'mockito-all', version: '1.9.5'
Expand All @@ -65,12 +62,18 @@ shadowJar {
CerberusUrl: getProfileProperty('cerberus.url'),
CerberusEnvironment: "${project.environment}",
ViolationBlacklistDurationInMins: getProfileProperty('log_processor.violation_blacklist_duration_in_mins'),
RequestPerMinuteLimit: getProfileProperty('log_processor.request_per_minute_limit'),
RequestPerIntervalLimit: getProfileProperty('log_processor.request_per_interval_limit'),
IntervalInMins: getProfileProperty('log_processor.interval_in_mins'),
SlackWebHookUrl: getProfileProperty('log_processor.slack_web_hook_url'),
SlackIcon: getProfileProperty('log_processor.slack_cerberus_icon'),
ManualBlacklistIpSetId: getProfileProperty('log_processor.manual_blacklist_ip_set_id'),
ManualWhitelistIpSetId: getProfileProperty('log_processor.manual_whitelist_ip_set_id'),
RateLimitAutoBlacklistIpSetId: getProfileProperty('log_processor.auto_blacklist_ip_set_id')
RateLimitAutoBlacklistIpSetId: getProfileProperty('log_processor.auto_blacklist_ip_set_id'),
AthenaDatabaseName: getProfileProperty('log_processor.athena_database_name'),
AthenaTableName: getProfileProperty('log_processor.athena_table_name'),
AthenaQueryResultBucketName: getProfileProperty('log_processor.athena_query_result_bucket_name'),
AlbLogBucket: getProfileProperty('log_processor.alb_log_bucket'),
AlbLogBucketArn: "arn:aws:s3:::" + getProfileProperty('log_processor.alb_log_bucket')
]
logStackOutputs = true
}
Expand Down Expand Up @@ -98,7 +101,6 @@ task associateLambdaWithS3Bucket() {
.withRegion(region)
.build()

addLambdaEventToS3Bucket(s3Client, albLogBucketName, lambdaFunctionArn)
addLambdaPermissionToS3Bucket(s3Client, albLogBucketName, lambdaIamRoleArn)
}
}
Expand All @@ -110,21 +112,6 @@ Stack getCloudFormationStack(AmazonCloudFormation cloudFormationClient, String s
.get(0)
}

void addLambdaEventToS3Bucket(AmazonS3 s3Client, String s3BucketName, String lambdaFunctionArn) {
FilterRule s3KeyFilterRule = S3KeyFilter.FilterRuleName.Suffix.newRule(".log.gz")
S3KeyFilter s3KeyFilter = new S3KeyFilter().withFilterRules(s3KeyFilterRule)
Filter eventFilter = new Filter().withS3KeyFilter(s3KeyFilter)
LambdaConfiguration lambdaConfiguration = new LambdaConfiguration(lambdaFunctionArn, EnumSet.of(S3Event.ObjectCreated))
lambdaConfiguration.setFilter(eventFilter)

BucketNotificationConfiguration bucketNotificationConfiguration = s3Client.getBucketNotificationConfiguration(s3BucketName)

bucketNotificationConfiguration.addConfiguration("logProcessorNotificationConfig", lambdaConfiguration)

s3Client.setBucketNotificationConfiguration(s3BucketName, bucketNotificationConfiguration)
println "Added lambda notification to S3 bucket: ${s3BucketName}"
}

void addLambdaPermissionToS3Bucket(AmazonS3 s3Client, String s3BucketName, String lambdaIamRoleArn) {
BucketPolicy bucketPolicy = s3Client.getBucketPolicy(s3BucketName)
Policy policy = Policy.fromJson(bucketPolicy.getPolicyText())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import com.amazonaws.regions.Regions;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.event.S3EventNotification;
import com.amazonaws.services.waf.AWSWAFRegionalAsyncClientBuilder;
import com.amazonaws.services.waf.AWSWAFRegionalClient;
import com.fieldju.commons.EnvUtils;
import com.fieldju.commons.PropUtils;
import com.google.common.collect.Lists;
import com.nike.cerberus.lambda.waf.LogProcessorLambdaConfig;
Expand Down Expand Up @@ -39,6 +40,11 @@ public void before() throws IOException {
String manualBlacklistIpSetId = PropUtils.getRequiredProperty("BLACKLIST_IP_SET_ID");
String manualWhitelistIpSetId = PropUtils.getRequiredProperty("WHITELIST_IP_SET_ID");
String rateLimitAutoBlacklistIpSetId = PropUtils.getRequiredProperty("RATE_LIMIT_IP_SET_ID");
String athenaDatabaseName = EnvUtils.getRequiredEnv("ATHENA_DATABASE_NAME");
String athenaTableName = EnvUtils.getRequiredEnv("ATHENA_TABLE_NAME");
String albLogBucketName = EnvUtils.getRequiredEnv("ALB_LOG_BUCKET");
String athenaQueryResultBucketName = EnvUtils.getRequiredEnv("ATHENA_QUERY_RESULT_BUCKET_NAME");
String iamPrincipalArn = EnvUtils.getRequiredEnv("IAM_PRINCIPAL_ARN");

handler = new ALBAccessLogEventHandler(
new AmazonS3Client(),
Expand All @@ -51,7 +57,13 @@ public void before() throws IOException {
60,
100,
null,
null));
null,
athenaDatabaseName,
athenaTableName,
athenaQueryResultBucketName,
albLogBucketName,
iamPrincipalArn,
Regions.fromName(region)));

List<S3EventNotification.S3EventNotificationRecord> records = Lists.newArrayList();
S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class);
Expand All @@ -75,7 +87,7 @@ public void before() throws IOException {
public void endToEndTest() throws IOException {
// this test doesn't actually assert anything, but it allows me to trigger the lambda in a
// controlled way and verify things manually, and attach a debugger to real running code
handler.handleNewS3Event(event);
handler.handleScheduledEvent();
}

}
Loading

0 comments on commit 83acc52

Please # to comment.