Skip to content

Commit

Permalink
REMOVEME: safe PCAPs
Browse files Browse the repository at this point in the history
  • Loading branch information
miri64 committed Jul 29, 2020
1 parent c83cc56 commit d1dde5b
Showing 1 changed file with 66 additions and 1 deletion.
67 changes: 66 additions & 1 deletion 10-icmpv6-error/test_spec10.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time

import pytest
Expand All @@ -6,7 +7,7 @@
ICMPv6EchoReply, ICMPv6EchoRequest, \
ICMPv6DestUnreach, ICMPv6PacketTooBig, \
ICMPv6TimeExceeded, ICMPv6ParamProblem, \
IPerror6, UDPerror
IPerror6, UDPerror, wrpcap

from riotctrl_shell.gnrc import GNRCIPv6NIB
from riotctrl_shell.netif import Ifconfig, IfconfigListParser
Expand Down Expand Up @@ -56,11 +57,21 @@ def test_task01(riot_ctrl, log_nodes):
ip_route_add(host_netif, "affe::/64", node_lladdr)
node.nib_route_add(node_netif, "beef::/64", host_lladdr)

sniffer = AsyncSniffer(
iface=host_netif,
)
sniffer.start()
time.sleep(.1)
pkt = srp1(Ether(dst=node_hwaddr) /
IPv6(src="beef::1", dst="affe::1") /
UDP(dport=48879),
iface=host_netif, timeout=5,
verbose=log_nodes)
pkts = sniffer.stop()
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task01.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
assert ICMPv6DestUnreach in pkt
assert pkt[ICMPv6DestUnreach].code == 0
assert pkt[IPv6].src == node_lladdr
Expand Down Expand Up @@ -144,12 +155,22 @@ def test_task04(riot_ctrl, log_nodes):
node.nib_route_add(node_netif, "beef::/64", host_lladdr)
node.nib_route_add(node_netif, "affe::/64", "fe80::ab25:f123")

sniffer = AsyncSniffer(
iface=host_netif,
)
sniffer.start()
time.sleep(.1)
pkt = srp1(Ether(dst=node_hwaddr) /
IPv6(src="beef::1", dst="affe::1") /
UDP(dport=48879),
iface=host_netif,
timeout=10, # needs to wait for address resolution
verbose=log_nodes)
pkts = sniffer.stop()
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task04.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
assert ICMPv6DestUnreach in pkt
assert pkt[ICMPv6DestUnreach].code == 3
assert pkt[IPv6].src == node_lladdr
Expand All @@ -173,11 +194,21 @@ def test_task05(riot_ctrl, log_nodes):
for addr in node_netifs[node_netif]["ipv6_addrs"]
if addr["scope"] == "link"][0]

sniffer = AsyncSniffer(
iface=host_netif,
)
sniffer.start()
time.sleep(.1)
pkt = srp1(Ether(dst=node_hwaddr) /
IPv6(src=host_lladdr, dst=node_lladdr) /
UDP(dport=48879),
iface=host_netif, timeout=5,
verbose=log_nodes)
pkts = sniffer.stop()
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task05.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
assert ICMPv6DestUnreach in pkt
assert pkt[ICMPv6DestUnreach].code == 4
assert pkt[IPv6].src == node_lladdr
Expand All @@ -201,11 +232,21 @@ def test_task06(riot_ctrl, log_nodes):
for addr in node_netifs[node_netif]["ipv6_addrs"]
if addr["scope"] == "link"][0]

sniffer = AsyncSniffer(
iface=host_netif,
)
sniffer.start()
time.sleep(.1)
pkt = srp1(Ether(dst=node_hwaddr) /
IPv6(src=host_lladdr, dst=node_lladdr) /
UDP(dport=48879) / ("x" * 1452),
iface=host_netif, timeout=5,
verbose=log_nodes)
pkts = sniffer.stop()
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task06.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
assert ICMPv6DestUnreach in pkt
assert pkt[ICMPv6DestUnreach].code == 4
assert pkt[IPv6].src == node_lladdr
Expand Down Expand Up @@ -241,11 +282,21 @@ def test_task07(riot_ctrl, log_nodes):
node.nib_route_add(node_netif, "beef::/64", host_lladdr)
node.nib_route_add(fwd_netif, "affe::/64", "fe80::ab25:f123")

sniffer = AsyncSniffer(
iface=host_netif,
)
sniffer.start()
time.sleep(.1)
pkt = srp1(Ether(dst=node_hwaddr) /
IPv6(src="beef::1", dst="affe::1") /
UDP(dport=48879) / ("x" * 1452),
iface=host_netif, timeout=5,
verbose=log_nodes)
pkts = sniffer.stop()
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task07.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
assert ICMPv6PacketTooBig in pkt
assert pkt[ICMPv6PacketTooBig].code == 0
assert node_netifs[fwd_netif]["mtu"] == pkt[ICMPv6PacketTooBig].mtu == 1280
Expand Down Expand Up @@ -307,12 +358,22 @@ def test_task09(riot_ctrl, log_nodes):
node.nib_route_add(node_netif, "beef::/64", host_lladdr)
node.nib_route_add(node_netif, "affe::/64", "fe80::ab25:f123")

sniffer = AsyncSniffer(
iface=host_netif,
)
sniffer.start()
time.sleep(.1)
pkt = srp1(Ether(dst=node_hwaddr) /
IPv6(src="beef::1", dst="affe::1", plen=20) /
UDP(dport=48879),
iface=host_netif,
timeout=5,
verbose=log_nodes)
pkts = sniffer.stop()
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task09.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
assert ICMPv6ParamProblem in pkt
assert pkt[ICMPv6ParamProblem].code == 0
err_bytes = raw(pkt[IPerror6])
Expand Down Expand Up @@ -381,6 +442,10 @@ def test_task10(riot_ctrl, log_nodes):
ICMPv6EchoRequest(id=stop_id), iface=host_netif, verbose=log_nodes)
sniffer.join()
pkts = sniffer.results
pcap = os.path.join(os.environ.get("GITHUB_WORKSPACE", os.getcwd()),
"task10.pcap")
print("Write to {}".format(pcap))
wrpcap(pcap, pkts)
requests = [p for p in pkts if ICMPv6EchoRequest in p and
p[ICMPv6EchoRequest].id == test_id]
replies = [p for p in pkts if ICMPv6EchoReply in p and
Expand Down

0 comments on commit d1dde5b

Please # to comment.