diff --git a/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/__init__.py b/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled.metadata.json b/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled.metadata.json new file mode 100644 index 00000000000..981a51c73b1 --- /dev/null +++ b/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "dms_replication_task_source_logging_enabled", + "CheckTitle": "Check if DMS replication tasks for the source database have logging enabled.", + "CheckType": [ + "Software and Configuration Checks/AWS Security Best Practices" + ], + "ServiceName": "dms", + "SubServiceName": "", + "ResourceIdTemplate": "arn:aws:dms:region:account-id:task/task-id", + "Severity": "medium", + "ResourceType": "AwsDmsReplicationTask", + "Description": "This control checks whether logging is enabled with the minimum severity level of LOGGER_SEVERITY_DEFAULT for DMS replication tasks SOURCE_CAPTURE and SOURCE_UNLOAD. The control fails if logging isn't enabled for these tasks or if the minimum severity level is less than LOGGER_SEVERITY_DEFAULT.", + "Risk": "Without logging enabled, issues in data migration may go undetected, affecting the integrity and compliance of replicated data.", + "RelatedUrl": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Monitoring.html#CHAP_Monitoring.ManagingLogs", + "Remediation": { + "Code": { + "CLI": "aws dms modify-replication-task --replication-task-arn --task-settings '{\"Logging\":{\"EnableLogging\":true,\"LogComponents\":[{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]}}'", + "NativeIaC": "", + "Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/dms-controls.html#dms-8", + "Terraform": "" + }, + "Recommendation": { + "Text": "Enable logging for source database DMS replication tasks with a minimum severity level of LOGGER_SEVERITY_DEFAULT.", + "Url": "https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled.py b/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled.py new file mode 100644 index 00000000000..fce11658a66 --- /dev/null +++ b/prowler/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled.py @@ -0,0 +1,79 @@ +from typing import List + +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.dms.dms_client import dms_client + + +class dms_replication_task_source_logging_enabled(Check): + """ + Check if AWS DMS replication tasks have logging enabled with the required + logging components and severity levels. + + This class verifies that each DMS replication task has logging enabled + and that the components SOURCE_CAPTURE and SOURCE_UNLOAD are configured with + at least LOGGER_SEVERITY_DEFAULT severity level. If either component is missing + or does not meet the minimum severity requirement, the check will fail. + """ + + def execute(self) -> List[Check_Report_AWS]: + """ + Execute the DMS replication task logging requirements check. + + Iterates over all DMS replication tasks and generates a report indicating + whether each task has logging enabled and meets the logging requirements + for SOURCE_CAPTURE and SOURCE_UNLOAD components. + + Returns: + List[Check_Report_AWS]: A list of report objects with the results of the check. + """ + MINIMUM_SEVERITY_LEVELS = [ + "LOGGER_SEVERITY_DEFAULT", + "LOGGER_SEVERITY_DEBUG", + "LOGGER_SEVERITY_DETAILED_DEBUG", + ] + findings = [] + for ( + replication_task_arn, + replication_task, + ) in dms_client.replication_tasks.items(): + report = Check_Report_AWS(self.metadata()) + report.resource_id = replication_task.id + report.resource_arn = replication_task_arn + report.region = replication_task.region + report.resource_tags = replication_task.tags + + if not replication_task.logging_enabled: + report.status = "FAIL" + report.status_extended = f"DMS Replication Task {replication_task.id} does not have logging enabled for source events." + else: + missing_components = [] + source_capture_compliant = False + source_unload_compliant = False + + for component in replication_task.log_components: + if ( + component["Id"] == "SOURCE_CAPTURE" + and component["Severity"] in MINIMUM_SEVERITY_LEVELS + ): + source_capture_compliant = True + elif ( + component["Id"] == "SOURCE_UNLOAD" + and component["Severity"] in MINIMUM_SEVERITY_LEVELS + ): + source_unload_compliant = True + + if not source_capture_compliant: + missing_components.append("Source Capture") + if not source_unload_compliant: + missing_components.append("Source Unload") + + if source_capture_compliant and source_unload_compliant: + report.status = "PASS" + report.status_extended = f"DMS Replication Task {replication_task.id} has logging enabled with the minimum severity level in source events." + else: + report.status = "FAIL" + report.status_extended = f"DMS Replication Task {replication_task.id} does not meet the minimum severity level of logging in {', '.join(missing_components)} events." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/dms/dms_service.py b/prowler/providers/aws/services/dms/dms_service.py index a539a6417d9..a8aa2e05701 100644 --- a/prowler/providers/aws/services/dms/dms_service.py +++ b/prowler/providers/aws/services/dms/dms_service.py @@ -1,3 +1,4 @@ +import json from typing import Optional from pydantic import BaseModel @@ -13,10 +14,13 @@ def __init__(self, provider): super().__init__(__class__.__name__, provider) self.instances = [] self.endpoints = {} + self.replication_tasks = {} self.__threading_call__(self._describe_replication_instances) self.__threading_call__(self._list_tags, self.instances) self.__threading_call__(self._describe_endpoints) self.__threading_call__(self._list_tags, self.endpoints.values()) + self.__threading_call__(self._describe_replication_tasks) + self.__threading_call__(self._list_tags, self.replication_tasks.values()) def _describe_replication_instances(self, regional_client): logger.info("DMS - Describing DMS Replication Instances...") @@ -87,6 +91,37 @@ def _describe_endpoints(self, regional_client): f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _describe_replication_tasks(self, regional_client): + logger.info("DMS - Describing DMS Replication Tasks for Logging Settings...") + try: + paginator = regional_client.get_paginator("describe_replication_tasks") + for page in paginator.paginate(): + for task in page["ReplicationTasks"]: + arn = task["ReplicationTaskArn"] + if not self.audit_resources or ( + is_resource_filtered(arn, self.audit_resources) + ): + task_settings = json.loads( + task.get("ReplicationTaskSettings", "") + ) + self.replication_tasks[arn] = ReplicationTasks( + arn=arn, + id=task["ReplicationTaskIdentifier"], + region=regional_client.region, + source_endpoint_arn=task["SourceEndpointArn"], + target_endpoint_arn=task["TargetEndpointArn"], + logging_enabled=task_settings.get("Logging", {}).get( + "EnableLogging", False + ), + log_components=task_settings.get("Logging", {}).get( + "LogComponents", [] + ), + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + def _list_tags(self, resource: any): try: resource.tags = self.regional_clients[ @@ -103,11 +138,11 @@ class Endpoint(BaseModel): id: str region: str ssl_mode: str - tags: Optional[list] redis_ssl_protocol: str mongodb_auth_type: str neptune_iam_auth_enabled: bool = False engine_name: str + tags: Optional[list] class RepInstance(BaseModel): @@ -121,3 +156,14 @@ class RepInstance(BaseModel): multi_az: bool region: str tags: Optional[list] = [] + + +class ReplicationTasks(BaseModel): + arn: str + id: str + region: str + source_endpoint_arn: str + target_endpoint_arn: str + logging_enabled: bool = False + log_components: list[dict] = [] + tags: Optional[list] = [] diff --git a/tests/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled_test.py b/tests/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled_test.py new file mode 100644 index 00000000000..3ba90bcccaf --- /dev/null +++ b/tests/providers/aws/services/dms/dms_replication_task_source_logging_enabled/dms_replication_task_source_logging_enabled_test.py @@ -0,0 +1,475 @@ +from unittest import mock + +from boto3 import client +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + +DMS_ENDPOINT_NAME = "dms-endpoint" +DMS_ENDPOINT_ARN = f"arn:aws:dms:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:endpoint:{DMS_ENDPOINT_NAME}" +DMS_INSTANCE_NAME = "rep-instance" +DMS_INSTANCE_ARN = ( + f"arn:aws:dms:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:rep:{DMS_INSTANCE_NAME}" +) + + +class Test_dms_replication_task_source_logging_enabled: + @mock_aws + def test_no_dms_replication_tasks(self): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.replication_tasks = {} + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 0 + + @mock_aws + def test_dms_replication_task_logging_not_enabled(self): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": false, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_DEFAULT" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "DMS Replication Task rep-task does not have logging enabled for source events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" + + @mock_aws + def test_dms_replication_task_logging_enabled_source_capture_only(self): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_DEFAULT" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "DMS Replication Task rep-task does not meet the minimum severity level of logging in Source Unload events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" + + @mock_aws + def test_dms_replication_task_logging_enabled_source_unload_only(self): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_UNLOAD", + "Severity": "LOGGER_SEVERITY_DEFAULT" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "DMS Replication Task rep-task does not meet the minimum severity level of logging in Source Capture events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" + + @mock_aws + def test_dms_replication_task_logging_enabled_source_unload_capture_with_not_enough_severity_on_capture( + self, + ): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_INFO" + }, + { + "Id": "SOURCE_UNLOAD", + "Severity": "LOGGER_SEVERITY_DEFAULT" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "DMS Replication Task rep-task does not meet the minimum severity level of logging in Source Capture events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" + + @mock_aws + def test_dms_replication_task_logging_enabled_source_unload_capture_with_not_enough_severity_on_unload( + self, + ): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_DEFAULT" + }, + { + "Id": "SOURCE_UNLOAD", + "Severity": "LOGGER_SEVERITY_INFO" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "DMS Replication Task rep-task does not meet the minimum severity level of logging in Source Unload events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" + + @mock_aws + def test_dms_replication_task_logging_enabled_source_unload_capture_with_not_enough_severity_on_both( + self, + ): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_INFO" + }, + { + "Id": "SOURCE_UNLOAD", + "Severity": "LOGGER_SEVERITY_INFO" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].status_extended == ( + "DMS Replication Task rep-task does not meet the minimum severity level of logging in Source Capture, Source Unload events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" + + @mock_aws + def test_dms_replication_task_logging_enabled_source_unload_capture_with_enough_severity_on_both( + self, + ): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_DEFAULT" + }, + { + "Id": "SOURCE_UNLOAD", + "Severity": "LOGGER_SEVERITY_DEFAULT" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + from prowler.providers.aws.services.dms.dms_service import DMS + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled.dms_client", + new=DMS(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.dms.dms_replication_task_source_logging_enabled.dms_replication_task_source_logging_enabled import ( + dms_replication_task_source_logging_enabled, + ) + + check = dms_replication_task_source_logging_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].status_extended == ( + "DMS Replication Task rep-task has logging enabled with the minimum severity level in source events." + ) + assert result[0].resource_id == "rep-task" + assert result[0].resource_arn == dms_replication_task_arn + assert result[0].resource_tags == [] + assert result[0].region == "us-east-1" diff --git a/tests/providers/aws/services/dms/dms_service_test.py b/tests/providers/aws/services/dms/dms_service_test.py index 94474a8fbe3..4918c890ce4 100644 --- a/tests/providers/aws/services/dms/dms_service_test.py +++ b/tests/providers/aws/services/dms/dms_service_test.py @@ -1,9 +1,12 @@ import botocore +from boto3 import client from mock import patch +from moto import mock_aws from prowler.providers.aws.services.dms.dms_service import DMS from tests.providers.aws.utils import ( AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1, set_mocked_aws_provider, ) @@ -148,3 +151,61 @@ def test_list_tags(self): {"Key": "Name", "Value": "dms-endpoint"}, {"Key": "Owner", "Value": "admin"}, ] + + @mock_aws + def test_describe_replication_tags(self): + dms_client = client("dms", region_name=AWS_REGION_US_EAST_1) + + dms_client.create_replication_task( + ReplicationTaskIdentifier="rep-task", + SourceEndpointArn=DMS_ENDPOINT_ARN, + TargetEndpointArn=DMS_ENDPOINT_ARN, + MigrationType="full-load", + ReplicationTaskSettings=""" + { + "Logging": { + "EnableLogging": true, + "LogComponents": [ + { + "Id": "SOURCE_CAPTURE", + "Severity": "LOGGER_SEVERITY_DEFAULT" + }, + { + "Id": "SOURCE_UNLOAD", + "Severity": "LOGGER_SEVERITY_DEFAULT" + } + ] + } + } + """, + TableMappings="", + ReplicationInstanceArn=DMS_INSTANCE_ARN, + ) + + dms_replication_task_arn = dms_client.describe_replication_tasks()[ + "ReplicationTasks" + ][0]["ReplicationTaskArn"] + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1] + ) + dms = DMS(aws_provider) + + assert dms.replication_tasks[dms_replication_task_arn].id == "rep-task" + assert ( + dms.replication_tasks[dms_replication_task_arn].region + == AWS_REGION_US_EAST_1 + ) + assert dms.replication_tasks[dms_replication_task_arn].logging_enabled + assert dms.replication_tasks[dms_replication_task_arn].log_components == [ + {"Id": "SOURCE_CAPTURE", "Severity": "LOGGER_SEVERITY_DEFAULT"}, + {"Id": "SOURCE_UNLOAD", "Severity": "LOGGER_SEVERITY_DEFAULT"}, + ] + assert ( + dms.replication_tasks[dms_replication_task_arn].source_endpoint_arn + == DMS_ENDPOINT_ARN + ) + assert ( + dms.replication_tasks[dms_replication_task_arn].target_endpoint_arn + == DMS_ENDPOINT_ARN + )