Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(aws): get firewall manager managed rule groups #6124

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions prowler/providers/aws/services/wafv2/wafv2_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,22 @@ def _get_web_acl(self, acl: str):
else:
acl.rules.append(new_rule)

firewall_manager_managed_rg = get_web_acl.get("WebACL", {}).get(
"PreProcessFirewallManagerRuleGroups", []
) + get_web_acl.get("WebACL", {}).get(
"PostProcessFirewallManagerRuleGroups", []
)

for rule in firewall_manager_managed_rg:
acl.rule_groups.append(
Rule(
name=rule.get("Name", ""),
cloudwatch_metrics_enabled=rule.get(
"VisibilityConfig", {}
).get("CloudWatchMetricsEnabled", False),
)
)

except Exception as error:
logger.error(
f"{acl.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand Down Expand Up @@ -193,13 +209,6 @@ class Rule(BaseModel):
cloudwatch_metrics_enabled: bool = False


class FirewallManagerRuleGroup(BaseModel):
"""Model representing a rule group for the Web ACL."""

name: str
cloudwatch_metrics_enabled: bool = False


class WebAclv2(BaseModel):
"""Model representing a Web ACL for WAFv2."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,61 @@
from unittest import mock
from unittest.mock import patch

import botocore
from boto3 import client
from moto import mock_aws

from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider

# Original botocore _make_api_call function
orig = botocore.client.BaseClient._make_api_call

FM_RG_NAME = "test-firewall-managed-rule-group"
FM_RG_ARN = "arn:aws:wafv2:us-east-1:123456789012:regional/webacl/test-firewall-managed-rule-group"


# Mocked botocore _make_api_call function
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListWebACLs":
return {
"WebACLs": [
{
"Name": FM_RG_NAME,
"Id": FM_RG_NAME,
"ARN": FM_RG_ARN,
}
]
}
elif operation_name == "GetWebACL":
return {
"WebACL": {
"PostProcessFirewallManagerRuleGroups": [
{
"Name": FM_RG_NAME,
"VisibilityConfig": {
"SampledRequestsEnabled": True,
"CloudWatchMetricsEnabled": True,
"MetricName": "web-acl-test-metric",
},
}
]
}
}
elif operation_name == "ListResourcesForWebACL":
return {
"ResourceArns": [
FM_RG_ARN,
]
}
elif operation_name == "ListTagsForResource":
return {
"TagInfoForResource": {
"ResourceARN": FM_RG_ARN,
"TagList": [{"Key": "Name", "Value": FM_RG_NAME}],
}
}
return orig(self, operation_name, kwarg)


class Test_wafv2_webacl_with_rules:
@mock_aws
Expand All @@ -13,12 +64,15 @@ def test_no_web_acls(self):

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
),
):
from prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules import (
wafv2_webacl_with_rules,
Expand Down Expand Up @@ -69,12 +123,15 @@ def test_wafv2_web_acl_with_rule(self):

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
),
):
from prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules import (
wafv2_webacl_with_rules,
Expand Down Expand Up @@ -137,12 +194,15 @@ def test_wafv2_web_acl_with_rule_group(self):

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
),
):
from prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules import (
wafv2_webacl_with_rules,
Expand All @@ -161,6 +221,43 @@ def test_wafv2_web_acl_with_rule_group(self):
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{"Key": "Name", "Value": waf_name}]

@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call,
)
@mock_aws
def test_wafv2_web_acl_with_firewall_manager_managed_rule_group(self):
from prowler.providers.aws.services.wafv2.wafv2_service import WAFv2

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
),
):
from prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules import (
wafv2_webacl_with_rules,
)

check = wafv2_webacl_with_rules()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AWS WAFv2 Web ACL {FM_RG_NAME} does have rules or rule groups attached."
)
assert result[0].resource_id == FM_RG_NAME
assert result[0].resource_arn == FM_RG_ARN
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == [{"Key": "Name", "Value": FM_RG_NAME}]

@mock_aws
def test_wafv2_web_acl_without_rule_or_rule_group(self):
wafv2_client = client("wafv2", region_name=AWS_REGION_US_EAST_1)
Expand All @@ -184,12 +281,15 @@ def test_wafv2_web_acl_without_rule_or_rule_group(self):

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules.wafv2_client",
new=WAFv2(aws_provider),
),
):
from prowler.providers.aws.services.wafv2.wafv2_webacl_with_rules.wafv2_webacl_with_rules import (
wafv2_webacl_with_rules,
Expand Down
Loading