xref: /freebsd-src/tests/sys/netpfil/pf/frag6.py (revision 7f846fc0e7ce30c80e3265c957a138d7192af397)
1b23dbabbSKristof Provostimport pytest
2b23dbabbSKristof Provostimport logging
3b23dbabbSKristof Provostimport threading
4b23dbabbSKristof Provostimport time
5*7f846fc0SKristof Provostimport random
6b23dbabbSKristof Provostlogging.getLogger("scapy").setLevel(logging.CRITICAL)
7b23dbabbSKristof Provostfrom atf_python.sys.net.tools import ToolsHelper
8b23dbabbSKristof Provostfrom atf_python.sys.net.vnet import VnetTestTemplate
9b23dbabbSKristof Provost
10b23dbabbSKristof Provostclass DelayedSend(threading.Thread):
11b23dbabbSKristof Provost    def __init__(self, packet):
12b23dbabbSKristof Provost        threading.Thread.__init__(self)
13b23dbabbSKristof Provost        self._packet = packet
14b23dbabbSKristof Provost
15b23dbabbSKristof Provost        self.start()
16b23dbabbSKristof Provost
17b23dbabbSKristof Provost    def run(self):
18b23dbabbSKristof Provost        import scapy.all as sp
19b23dbabbSKristof Provost        time.sleep(1)
20b23dbabbSKristof Provost        sp.send(self._packet)
21b23dbabbSKristof Provost
22b23dbabbSKristof Provostclass TestFrag6(VnetTestTemplate):
23*7f846fc0SKristof Provost    REQUIRED_MODULES = ["pf", "dummymbuf"]
24b23dbabbSKristof Provost    TOPOLOGY = {
25b23dbabbSKristof Provost        "vnet1": {"ifaces": ["if1"]},
26b23dbabbSKristof Provost        "vnet2": {"ifaces": ["if1"]},
27b23dbabbSKristof Provost        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
28b23dbabbSKristof Provost    }
29b23dbabbSKristof Provost
30b23dbabbSKristof Provost    def vnet2_handler(self, vnet):
31*7f846fc0SKristof Provost        ifname = vnet.iface_alias_map["if1"].name
32b23dbabbSKristof Provost        ToolsHelper.print_output("/sbin/pfctl -e")
33b23dbabbSKristof Provost        ToolsHelper.pf_rules([
34*7f846fc0SKristof Provost            "scrub fragment reassemble min-ttl 10",
35b23dbabbSKristof Provost            "pass",
36b23dbabbSKristof Provost            "block in inet6 proto icmp6 icmp6-type echoreq",
37b23dbabbSKristof Provost        ])
38*7f846fc0SKristof Provost        ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6")
39*7f846fc0SKristof Provost        ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname)
40b23dbabbSKristof Provost
41b23dbabbSKristof Provost    def check_ping_reply(self, packet):
42b23dbabbSKristof Provost        print(packet)
43b23dbabbSKristof Provost        return False
44b23dbabbSKristof Provost
45b23dbabbSKristof Provost    @pytest.mark.require_user("root")
46b23dbabbSKristof Provost    def test_dup_frag_hdr(self):
47b23dbabbSKristof Provost        "Test packets with duplicate fragment headers"
48b23dbabbSKristof Provost        srv_vnet = self.vnet_map["vnet2"]
49b23dbabbSKristof Provost
50b23dbabbSKristof Provost        # Import in the correct vnet, so at to not confuse Scapy
51b23dbabbSKristof Provost        import scapy.all as sp
52b23dbabbSKristof Provost
53b23dbabbSKristof Provost        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
54b23dbabbSKristof Provost            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
55b23dbabbSKristof Provost            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
56b23dbabbSKristof Provost            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
57b23dbabbSKristof Provost
58b23dbabbSKristof Provost        # Delay the send so the sniffer is running when we transmit.
59b23dbabbSKristof Provost        s = DelayedSend(packet)
60b23dbabbSKristof Provost
61b23dbabbSKristof Provost        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
62b23dbabbSKristof Provost            timeout=3)
63b23dbabbSKristof Provost        for p in packets:
64b23dbabbSKristof Provost            assert not p.getlayer(sp.ICMPv6EchoReply)
656a3266f7SKristof Provost
66*7f846fc0SKristof Provost    @pytest.mark.require_user("root")
67*7f846fc0SKristof Provost    def test_overlong(self):
68*7f846fc0SKristof Provost        "Test overly long fragmented packet"
69*7f846fc0SKristof Provost
70*7f846fc0SKristof Provost        # Import in the correct vnet, so at to not confuse Scapy
71*7f846fc0SKristof Provost        import scapy.all as sp
72*7f846fc0SKristof Provost
73*7f846fc0SKristof Provost        curr = 0
74*7f846fc0SKristof Provost        pkts = []
75*7f846fc0SKristof Provost
76*7f846fc0SKristof Provost        frag_id = random.randint(0,0xffffffff)
77*7f846fc0SKristof Provost        gran = 1200
78*7f846fc0SKristof Provost
79*7f846fc0SKristof Provost        i = 0
80*7f846fc0SKristof Provost        while curr <= 65535:
81*7f846fc0SKristof Provost            ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2")
82*7f846fc0SKristof Provost            more = True
83*7f846fc0SKristof Provost            g = gran
84*7f846fc0SKristof Provost            if curr + gran > 65535:
85*7f846fc0SKristof Provost                more = False
86*7f846fc0SKristof Provost                g = 65530 - curr
87*7f846fc0SKristof Provost            if i == 0:
88*7f846fc0SKristof Provost                pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \
89*7f846fc0SKristof Provost                    sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
90*7f846fc0SKristof Provost            else:
91*7f846fc0SKristof Provost                pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
92*7f846fc0SKristof Provost            pkts.append(pkt)
93*7f846fc0SKristof Provost            curr += gran
94*7f846fc0SKristof Provost            i += 1
95*7f846fc0SKristof Provost
96*7f846fc0SKristof Provost        sp.send(pkts, inter = 0.1)
97*7f846fc0SKristof Provost
986a3266f7SKristof Provostclass TestFrag6_Overlap(VnetTestTemplate):
996a3266f7SKristof Provost    REQUIRED_MODULES = ["pf"]
1006a3266f7SKristof Provost    TOPOLOGY = {
1016a3266f7SKristof Provost        "vnet1": {"ifaces": ["if1"]},
1026a3266f7SKristof Provost        "vnet2": {"ifaces": ["if1"]},
1036a3266f7SKristof Provost        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
1046a3266f7SKristof Provost    }
1056a3266f7SKristof Provost
1066a3266f7SKristof Provost    def vnet2_handler(self, vnet):
1076a3266f7SKristof Provost        ToolsHelper.print_output("/sbin/pfctl -e")
1086a3266f7SKristof Provost        ToolsHelper.print_output("/sbin/pfctl -x loud")
1096a3266f7SKristof Provost        ToolsHelper.pf_rules([
1106a3266f7SKristof Provost            "scrub fragment reassemble",
1116a3266f7SKristof Provost            "pass",
1126a3266f7SKristof Provost        ])
1136a3266f7SKristof Provost
1146a3266f7SKristof Provost    @pytest.mark.require_user("root")
1156a3266f7SKristof Provost    def test_overlap(self):
1166a3266f7SKristof Provost        "Ensure we discard packets with overlapping fragments"
1176a3266f7SKristof Provost
1186a3266f7SKristof Provost        # Import in the correct vnet, so at to not confuse Scapy
1196a3266f7SKristof Provost        import scapy.all as sp
1206a3266f7SKristof Provost
1216a3266f7SKristof Provost        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
1226a3266f7SKristof Provost            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
1236a3266f7SKristof Provost        frags = sp.fragment6(packet, 128)
1246a3266f7SKristof Provost        assert len(frags) == 3
1256a3266f7SKristof Provost
1266a3266f7SKristof Provost        f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
1276a3266f7SKristof Provost        # Fragment with overlap
1286a3266f7SKristof Provost        overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
1296a3266f7SKristof Provost            / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
1306a3266f7SKristof Provost            / sp.raw(bytes.fromhex('f00f') * 4)
1316a3266f7SKristof Provost        frags = [ frags[0], frags[1], overlap, frags[2] ]
1326a3266f7SKristof Provost
1336a3266f7SKristof Provost        # Delay the send so the sniffer is running when we transmit.
1346a3266f7SKristof Provost        s = DelayedSend(frags)
1356a3266f7SKristof Provost
1366a3266f7SKristof Provost        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
1376a3266f7SKristof Provost            timeout=3)
1386a3266f7SKristof Provost        for p in packets:
1396a3266f7SKristof Provost            p.show()
1406a3266f7SKristof Provost            assert not p.getlayer(sp.ICMPv6EchoReply)
141