From d1dde5bbf8b3de065790ef708639bd54b753d9de Mon Sep 17 00:00:00 2001 From: Martine Lenders Date: Wed, 29 Jul 2020 09:38:39 +0200 Subject: [PATCH] REMOVEME: safe PCAPs --- 10-icmpv6-error/test_spec10.py | 67 +++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 1 deletion(-) diff --git a/10-icmpv6-error/test_spec10.py b/10-icmpv6-error/test_spec10.py index a8dc228d..f5b7dc41 100644 --- a/10-icmpv6-error/test_spec10.py +++ b/10-icmpv6-error/test_spec10.py @@ -1,3 +1,4 @@ +import os import time import pytest @@ -6,7 +7,7 @@ ICMPv6EchoReply, ICMPv6EchoRequest, \ ICMPv6DestUnreach, ICMPv6PacketTooBig, \ ICMPv6TimeExceeded, ICMPv6ParamProblem, \ - IPerror6, UDPerror + IPerror6, UDPerror, wrpcap from riotctrl_shell.gnrc import GNRCIPv6NIB from riotctrl_shell.netif import Ifconfig, IfconfigListParser @@ -56,11 +57,21 @@ def test_task01(riot_ctrl, log_nodes): ip_route_add(host_netif, "affe::/64", node_lladdr) node.nib_route_add(node_netif, "beef::/64", host_lladdr) + sniffer = AsyncSniffer( + iface=host_netif, + ) + sniffer.start() + time.sleep(.1) pkt = srp1(Ether(dst=node_hwaddr) / IPv6(src="beef::1", dst="affe::1") / UDP(dport=48879), iface=host_netif, timeout=5, verbose=log_nodes) + pkts = sniffer.stop() + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task01.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) assert ICMPv6DestUnreach in pkt assert pkt[ICMPv6DestUnreach].code == 0 assert pkt[IPv6].src == node_lladdr @@ -144,12 +155,22 @@ def test_task04(riot_ctrl, log_nodes): node.nib_route_add(node_netif, "beef::/64", host_lladdr) node.nib_route_add(node_netif, "affe::/64", "fe80::ab25:f123") + sniffer = AsyncSniffer( + iface=host_netif, + ) + sniffer.start() + time.sleep(.1) pkt = srp1(Ether(dst=node_hwaddr) / IPv6(src="beef::1", dst="affe::1") / UDP(dport=48879), iface=host_netif, timeout=10, # needs to wait for address resolution verbose=log_nodes) + pkts = sniffer.stop() + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task04.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) assert ICMPv6DestUnreach in pkt assert pkt[ICMPv6DestUnreach].code == 3 assert pkt[IPv6].src == node_lladdr @@ -173,11 +194,21 @@ def test_task05(riot_ctrl, log_nodes): for addr in node_netifs[node_netif]["ipv6_addrs"] if addr["scope"] == "link"][0] + sniffer = AsyncSniffer( + iface=host_netif, + ) + sniffer.start() + time.sleep(.1) pkt = srp1(Ether(dst=node_hwaddr) / IPv6(src=host_lladdr, dst=node_lladdr) / UDP(dport=48879), iface=host_netif, timeout=5, verbose=log_nodes) + pkts = sniffer.stop() + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task05.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) assert ICMPv6DestUnreach in pkt assert pkt[ICMPv6DestUnreach].code == 4 assert pkt[IPv6].src == node_lladdr @@ -201,11 +232,21 @@ def test_task06(riot_ctrl, log_nodes): for addr in node_netifs[node_netif]["ipv6_addrs"] if addr["scope"] == "link"][0] + sniffer = AsyncSniffer( + iface=host_netif, + ) + sniffer.start() + time.sleep(.1) pkt = srp1(Ether(dst=node_hwaddr) / IPv6(src=host_lladdr, dst=node_lladdr) / UDP(dport=48879) / ("x" * 1452), iface=host_netif, timeout=5, verbose=log_nodes) + pkts = sniffer.stop() + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task06.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) assert ICMPv6DestUnreach in pkt assert pkt[ICMPv6DestUnreach].code == 4 assert pkt[IPv6].src == node_lladdr @@ -241,11 +282,21 @@ def test_task07(riot_ctrl, log_nodes): node.nib_route_add(node_netif, "beef::/64", host_lladdr) node.nib_route_add(fwd_netif, "affe::/64", "fe80::ab25:f123") + sniffer = AsyncSniffer( + iface=host_netif, + ) + sniffer.start() + time.sleep(.1) pkt = srp1(Ether(dst=node_hwaddr) / IPv6(src="beef::1", dst="affe::1") / UDP(dport=48879) / ("x" * 1452), iface=host_netif, timeout=5, verbose=log_nodes) + pkts = sniffer.stop() + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task07.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) assert ICMPv6PacketTooBig in pkt assert pkt[ICMPv6PacketTooBig].code == 0 assert node_netifs[fwd_netif]["mtu"] == pkt[ICMPv6PacketTooBig].mtu == 1280 @@ -307,12 +358,22 @@ def test_task09(riot_ctrl, log_nodes): node.nib_route_add(node_netif, "beef::/64", host_lladdr) node.nib_route_add(node_netif, "affe::/64", "fe80::ab25:f123") + sniffer = AsyncSniffer( + iface=host_netif, + ) + sniffer.start() + time.sleep(.1) pkt = srp1(Ether(dst=node_hwaddr) / IPv6(src="beef::1", dst="affe::1", plen=20) / UDP(dport=48879), iface=host_netif, timeout=5, verbose=log_nodes) + pkts = sniffer.stop() + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task09.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) assert ICMPv6ParamProblem in pkt assert pkt[ICMPv6ParamProblem].code == 0 err_bytes = raw(pkt[IPerror6]) @@ -381,6 +442,10 @@ def test_task10(riot_ctrl, log_nodes): ICMPv6EchoRequest(id=stop_id), iface=host_netif, verbose=log_nodes) sniffer.join() pkts = sniffer.results + pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()), + "task10.pcap") + print("Write to {}".format(pcap)) + wrpcap(pcap, pkts) requests = [p for p in pkts if ICMPv6EchoRequest in p and p[ICMPv6EchoRequest].id == test_id] replies = [p for p in pkts if ICMPv6EchoReply in p and