Skip to content

Commit

Permalink
Merge pull request #117 from languitar/bugfix/ipv6-to-ipv4-mapping
Browse files Browse the repository at this point in the history
fix(activity): detect ipv4 mapped ipv6 connections
  • Loading branch information
languitar authored Oct 26, 2021
2 parents f3db876 + a81e456 commit a00c8fb
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/autosuspend/checks/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,23 @@ def __init__(self, name: str, ports: Iterable[int]) -> None:
Activity.__init__(self, name)
self._ports = ports

def normalize_address(
self, family: socket.AddressFamily, address: str
) -> Tuple[socket.AddressFamily, str]:
if family == socket.AF_INET6:
# strip scope
return family, address.split("%")[0]
elif family == socket.AF_INET:
# convert to IPv6 to handle cases where an IPv4 address is targeted via IPv6
# to IPv4 mapping
return socket.AF_INET6, f"::ffff:{address}"
else:
return family, address

def check(self) -> Optional[str]:
# Find the addresses of the system
own_addresses = [
(item.family, item.address.split("%")[0])
self.normalize_address(item.family, item.address)
for sublist in psutil.net_if_addrs().values()
for item in sublist
]
Expand All @@ -97,7 +110,8 @@ def check(self) -> Optional[str]:
connection.laddr[1]
for connection in psutil.net_connections()
if (
(connection.family, connection.laddr[0]) in own_addresses
self.normalize_address(connection.family, connection.laddr[0])
in own_addresses
and connection.status == "ESTABLISHED"
and connection.laddr[1] in self._ports
)
Expand Down
11 changes: 11 additions & 0 deletions tests/test_checks_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,17 @@ def test_smoke(self) -> None:
"ESTABLISHED",
None,
),
# ipv6 with mapping to ipv4
# https://github.com/languitar/autosuspend/issues/116
psutil._common.sconn(
-1,
socket.AF_INET6,
socket.SOCK_STREAM,
(f"::ffff:{MY_ADDRESS}", MY_PORT),
("42.42.42.42", 42),
"ESTABLISHED",
None,
),
],
)
def test_connected(
Expand Down

0 comments on commit a00c8fb

Please # to comment.