-
Notifications
You must be signed in to change notification settings - Fork 601
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into create-pull-request/maintenance-v1
- Loading branch information
Showing
5 changed files
with
864 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
from collections import deque | ||
from ipaddress import IPv4Network, IPv6Network, ip_network | ||
from typing import Any, Iterator | ||
|
||
from cfnlint.context import Path | ||
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator | ||
from cfnlint.rules.helpers import get_value_from_path | ||
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class VpcSubnetCidr(CfnLintKeyword): | ||
id = "E3059" | ||
shortdesc = "Validate subnet CIDRs are within the CIDRs of the VPC" | ||
description = ( | ||
"When specifying subnet CIDRs for a VPC the subnet CIDRs " | ||
"most be within the VPC CIDRs" | ||
) | ||
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html" | ||
tags = ["resources", "ec2", "vpc", "subnet"] | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
keywords=[ | ||
"Resources/AWS::EC2::VPC/Properties", | ||
], | ||
) | ||
|
||
def _validate_subnets( | ||
self, | ||
source: IPv4Network | IPv6Network, | ||
destination: IPv4Network | IPv6Network, | ||
) -> bool: | ||
if isinstance(source, IPv4Network) and isinstance(destination, IPv4Network): | ||
if source.subnet_of(destination): | ||
return True | ||
return False | ||
elif isinstance(source, IPv6Network) and isinstance(destination, IPv6Network): | ||
if source.subnet_of(destination): | ||
return True | ||
return False | ||
return False | ||
|
||
def _create_network(self, cidr: Any) -> IPv4Network | IPv6Network | None: | ||
if not isinstance(cidr, str): | ||
return None | ||
|
||
try: | ||
return ip_network(cidr) | ||
except Exception as e: | ||
LOGGER.debug(f"Unable to create network from {cidr}", e) | ||
|
||
return None | ||
|
||
def _get_vpc_cidrs( | ||
self, validator: Validator, instance: dict[str, Any] | ||
) -> Iterator[tuple[IPv4Network | IPv6Network | None, Validator]]: | ||
for key in [ | ||
"Ipv4IpamPoolId", | ||
"Ipv6IpamPoolId", | ||
"Ipv6Pool", | ||
"AmazonProvidedIpv6CidrBlock", | ||
]: | ||
for value, value_validator in get_value_from_path( | ||
validator, | ||
instance, | ||
deque([key]), | ||
): | ||
if value is None: | ||
continue | ||
yield None, value_validator | ||
|
||
for key in ["CidrBlock", "Ipv6CidrBlock"]: | ||
for cidr, cidr_validator in get_value_from_path( | ||
validator, | ||
instance, | ||
deque([key]), | ||
): | ||
|
||
if cidr is None: | ||
continue | ||
yield self._create_network(cidr), cidr_validator | ||
|
||
def validate( | ||
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] | ||
) -> ValidationResult: | ||
|
||
if not validator.cfn.graph: | ||
return | ||
|
||
vpc_ipv4_networks: list[IPv4Network] = [] | ||
vpc_ipv6_networks: list[IPv6Network] = [] | ||
for vpc_network, _ in self._get_vpc_cidrs(validator, instance): | ||
if not vpc_network: | ||
return | ||
if isinstance(vpc_network, IPv4Network): | ||
vpc_ipv4_networks.append(vpc_network) | ||
# you can't specify IPV6 networks on a VPC | ||
|
||
template_validator = validator.evolve( | ||
context=validator.context.evolve(path=Path()) | ||
) | ||
|
||
# dynamic vpc network (using IPAM or AWS provided) | ||
# allows to validate subnet overlapping even if using | ||
# dynamic networks | ||
has_dynamic_network = False | ||
|
||
for source, _ in validator.cfn.graph.graph.in_edges( | ||
validator.context.path.path[1] | ||
): | ||
if ( | ||
validator.cfn.graph.graph.nodes[source].get("resource_type") | ||
== "AWS::EC2::VPCCidrBlock" | ||
): | ||
for cidr_props, cidr_validator in get_value_from_path( | ||
template_validator, | ||
validator.cfn.template, | ||
deque(["Resources", source, "Properties"]), | ||
): | ||
for cidr_network, _ in self._get_vpc_cidrs( | ||
cidr_validator, cidr_props | ||
): | ||
if not cidr_network: | ||
has_dynamic_network = True | ||
continue | ||
if isinstance(cidr_network, IPv4Network): | ||
vpc_ipv4_networks.append(cidr_network) | ||
else: | ||
vpc_ipv6_networks.append(cidr_network) | ||
|
||
subnets: list[tuple[IPv4Network | IPv6Network, deque]] = [] | ||
for source, _ in validator.cfn.graph.graph.in_edges( | ||
validator.context.path.path[1] | ||
): | ||
if ( | ||
validator.cfn.graph.graph.nodes[source].get("resource_type") | ||
== "AWS::EC2::Subnet" | ||
): | ||
for subnet_props, source_validator in get_value_from_path( | ||
template_validator, | ||
validator.cfn.template, | ||
deque(["Resources", source, "Properties"]), | ||
): | ||
for subnet_network, subnet_validator in self._get_vpc_cidrs( | ||
source_validator, subnet_props | ||
): | ||
if not subnet_network: | ||
continue | ||
|
||
subnets.append( | ||
(subnet_network, subnet_validator.context.path.path) | ||
) | ||
if has_dynamic_network: | ||
continue | ||
if not any( | ||
self._validate_subnets( | ||
subnet_network, | ||
vpc_network, | ||
) | ||
for vpc_network in vpc_ipv4_networks + vpc_ipv6_networks | ||
): | ||
if isinstance(subnet_network, IPv4Network): | ||
# Every VPC has to have a ipv4 network | ||
# we continue if there isn't one | ||
if not vpc_ipv4_networks: | ||
continue | ||
reprs = ( | ||
"is not a valid subnet of " | ||
f"{[f'{str(v)}' for v in vpc_ipv4_networks]!r}" | ||
) | ||
else: | ||
if not vpc_ipv6_networks: | ||
reprs = ( | ||
"is specified on a VPC that has " | ||
"no ipv6 networks defined" | ||
) | ||
else: | ||
reprs = ( | ||
"is not a valid subnet of " | ||
f"{[f'{str(v)}' for v in vpc_ipv6_networks]!r}" | ||
) | ||
yield ValidationError( | ||
(f"{str(subnet_network)!r} {reprs}"), | ||
rule=self, | ||
path_override=subnet_validator.context.path.path, | ||
) | ||
continue |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
""" | ||
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
SPDX-License-Identifier: MIT-0 | ||
""" | ||
|
||
from __future__ import annotations | ||
|
||
import logging | ||
from collections import deque | ||
from ipaddress import IPv4Network, IPv6Network, ip_network | ||
from typing import Any | ||
|
||
from cfnlint.context.conditions import Unsatisfiable | ||
from cfnlint.helpers import ensure_list, is_function | ||
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator | ||
from cfnlint.rules.helpers import get_value_from_path | ||
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class VpcSubnetOverlap(CfnLintKeyword): | ||
id = "E3060" | ||
shortdesc = "Validate subnet CIDRs do not overlap with other subnets" | ||
description = ( | ||
"When specifying subnet CIDRs for a VPC the subnet CIDRs " | ||
"most not overlap with eachother" | ||
) | ||
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html" | ||
tags = ["resources", "ec2", "vpc", "subnet"] | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
keywords=[ | ||
"Resources/AWS::EC2::Subnet/Properties", | ||
], | ||
) | ||
self._subnets: dict[str, list[tuple[IPv4Network | IPv6Network, dict]]] = {} | ||
|
||
def initialize(self, cfn): | ||
self._subnets = {} | ||
return super().initialize(cfn) | ||
|
||
def _validate_subnets( | ||
self, | ||
source: IPv4Network | IPv6Network, | ||
destination: IPv4Network | IPv6Network, | ||
) -> bool: | ||
if isinstance(source, IPv4Network) and isinstance(destination, IPv4Network): | ||
if source.overlaps(destination): | ||
return True | ||
return False | ||
elif isinstance(source, IPv6Network) and isinstance(destination, IPv6Network): | ||
if source.overlaps(destination): | ||
return True | ||
return False | ||
return False | ||
|
||
def _create_network(self, cidr: Any) -> IPv4Network | IPv6Network | None: | ||
if not isinstance(cidr, str): | ||
return None | ||
|
||
try: | ||
return ip_network(cidr) | ||
except Exception as e: | ||
LOGGER.debug(f"Unable to create network from {cidr}", e) | ||
|
||
return None | ||
|
||
def validate( | ||
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any] | ||
) -> ValidationResult: | ||
|
||
for vpc_id, vpc_validator in get_value_from_path( | ||
validator=validator, instance=instance, path=deque(["VpcId"]) | ||
): | ||
|
||
if not isinstance(vpc_id, (str, dict)): | ||
return | ||
|
||
fn_k, fn_v = is_function(vpc_id) | ||
if fn_k == "Fn::GetAtt": | ||
vpc_id = ensure_list(fn_v)[0].split(".")[0] | ||
elif fn_k == "Ref": | ||
vpc_id = fn_v | ||
elif fn_k: | ||
# its a function that we can't resolve | ||
return | ||
|
||
if not validator.is_type(vpc_id, "string"): | ||
return | ||
if vpc_id not in self._subnets: | ||
self._subnets[vpc_id] = [] | ||
|
||
for key in ["CidrBlock", "Ipv6CidrBlock"]: | ||
for cidr_block, cidr_block_validator in get_value_from_path( | ||
validator=vpc_validator, instance=instance, path=deque([key]) | ||
): | ||
|
||
cidr_network = self._create_network(cidr_block) | ||
if not cidr_network: | ||
continue | ||
|
||
for saved_subnet, conditions in self._subnets[vpc_id]: | ||
# attempt to validate if the saved conditions comply | ||
# with these conditions | ||
try: | ||
cidr_block_validator.evolve( | ||
context=cidr_block_validator.context.evolve( | ||
conditions=cidr_block_validator.context.conditions.evolve( | ||
conditions | ||
) | ||
) | ||
) | ||
except Unsatisfiable: | ||
continue | ||
|
||
# now we can evaluate if they overlap | ||
if self._validate_subnets( | ||
cidr_network, | ||
saved_subnet, | ||
): | ||
yield ValidationError( | ||
( | ||
f"{str(cidr_network)!r} overlaps " | ||
f"with {str(saved_subnet)!r}" | ||
), | ||
rule=self, | ||
path=deque( | ||
list(cidr_block_validator.context.path.path)[1:] | ||
), | ||
) | ||
|
||
self._subnets[vpc_id].append( | ||
(cidr_network, cidr_block_validator.context.conditions.status) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.