-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(neptune): add new fixer
neptune_cluster_public_snapshot_fixer
(#…
- Loading branch information
1 parent
26a9748
commit fe2dd69
Showing
2 changed files
with
132 additions
and
0 deletions.
There are no files selected for viewing
43 changes: 43 additions & 0 deletions
43
...services/neptune/neptune_cluster_public_snapshot/neptune_cluster_public_snapshot_fixer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from prowler.lib.logger import logger | ||
from prowler.providers.aws.services.neptune.neptune_client import neptune_client | ||
|
||
|
||
def fixer(resource_id: str, region: str) -> bool: | ||
""" | ||
Modify the attributes of a Neptune DB cluster snapshot to remove public access. | ||
Specifically, this fixer removes the 'all' value from the 'restore' attribute to | ||
prevent the snapshot from being publicly accessible. | ||
Requires the rds:ModifyDBClusterSnapshotAttribute permissions. | ||
{ | ||
"Version": "2012-10-17", | ||
"Statement": [ | ||
{ | ||
"Effect": "Allow", | ||
"Action": "rds:ModifyDBClusterSnapshotAttribute", | ||
"Resource": "*" | ||
} | ||
] | ||
} | ||
Args: | ||
resource_id (str): The DB cluster snapshot identifier. | ||
region (str): AWS region where the snapshot exists. | ||
Returns: | ||
bool: True if the operation is successful (public access is removed), False otherwise. | ||
""" | ||
try: | ||
regional_client = neptune_client.regional_clients[region] | ||
regional_client.modify_db_cluster_snapshot_attribute( | ||
DBClusterSnapshotIdentifier=resource_id, | ||
AttributeName="restore", | ||
ValuesToRemove=["all"], | ||
) | ||
except Exception as error: | ||
logger.error( | ||
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
) | ||
return False | ||
else: | ||
return True |
89 changes: 89 additions & 0 deletions
89
...ces/neptune/neptune_cluster_public_snapshot/neptune_cluster_public_snapshot_fixer_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
from unittest import mock | ||
|
||
import botocore | ||
import botocore.client | ||
from moto import mock_aws | ||
|
||
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider | ||
|
||
mock_make_api_call = botocore.client.BaseClient._make_api_call | ||
|
||
|
||
def mock_make_api_call_public_snapshot(self, operation_name, kwarg): | ||
if operation_name == "ModifyDBClusterSnapshotAttribute": | ||
return { | ||
"DBClusterSnapshotAttributesResult": { | ||
"DBClusterSnapshotAttributes": [ | ||
{ | ||
"AttributeName": "restore", | ||
"DBClusterSnapshotIdentifier": "test-snapshot", | ||
"AttributeValues": [], | ||
} | ||
] | ||
} | ||
} | ||
return mock_make_api_call(self, operation_name, kwarg) | ||
|
||
|
||
def mock_make_api_call_public_snapshot_error(self, operation_name, kwarg): | ||
if operation_name == "ModifyDBClusterSnapshotAttribute": | ||
raise botocore.exceptions.ClientError( | ||
{ | ||
"Error": { | ||
"Code": "DBClusterSnapshotNotFoundFault", | ||
"Message": "DBClusterSnapshotNotFoundFault", | ||
} | ||
}, | ||
operation_name, | ||
) | ||
return mock_make_api_call(self, operation_name, kwarg) | ||
|
||
|
||
class Test_neptune_cluster_public_snapshot_fixer: | ||
@mock_aws | ||
def test_neptune_cluster_public_snapshot_fixer(self): | ||
with mock.patch( | ||
"botocore.client.BaseClient._make_api_call", | ||
new=mock_make_api_call_public_snapshot, | ||
): | ||
from prowler.providers.aws.services.neptune.neptune_service import Neptune | ||
|
||
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) | ||
|
||
with mock.patch( | ||
"prowler.providers.common.provider.Provider.get_global_provider", | ||
return_value=aws_provider, | ||
), mock.patch( | ||
"prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer.neptune_client", | ||
new=Neptune(aws_provider), | ||
): | ||
from prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer import ( | ||
fixer, | ||
) | ||
|
||
assert fixer(resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1) | ||
|
||
@mock_aws | ||
def test_neptune_cluster_public_snapshot_fixer_error(self): | ||
with mock.patch( | ||
"botocore.client.BaseClient._make_api_call", | ||
new=mock_make_api_call_public_snapshot_error, | ||
): | ||
from prowler.providers.aws.services.neptune.neptune_service import Neptune | ||
|
||
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) | ||
|
||
with mock.patch( | ||
"prowler.providers.common.provider.Provider.get_global_provider", | ||
return_value=aws_provider, | ||
), mock.patch( | ||
"prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer.neptune_client", | ||
new=Neptune(aws_provider), | ||
): | ||
from prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer import ( | ||
fixer, | ||
) | ||
|
||
assert not fixer( | ||
resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1 | ||
) |