xref: /freebsd-src/tests/sys/netpfil/pf/frag6.py (revision 7f846fc0e7ce30c80e3265c957a138d7192af397)
1import pytest
2import logging
3import threading
4import time
5import random
6logging.getLogger("scapy").setLevel(logging.CRITICAL)
7from atf_python.sys.net.tools import ToolsHelper
8from atf_python.sys.net.vnet import VnetTestTemplate
9
10class DelayedSend(threading.Thread):
11    def __init__(self, packet):
12        threading.Thread.__init__(self)
13        self._packet = packet
14
15        self.start()
16
17    def run(self):
18        import scapy.all as sp
19        time.sleep(1)
20        sp.send(self._packet)
21
22class TestFrag6(VnetTestTemplate):
23    REQUIRED_MODULES = ["pf", "dummymbuf"]
24    TOPOLOGY = {
25        "vnet1": {"ifaces": ["if1"]},
26        "vnet2": {"ifaces": ["if1"]},
27        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
28    }
29
30    def vnet2_handler(self, vnet):
31        ifname = vnet.iface_alias_map["if1"].name
32        ToolsHelper.print_output("/sbin/pfctl -e")
33        ToolsHelper.pf_rules([
34            "scrub fragment reassemble min-ttl 10",
35            "pass",
36            "block in inet6 proto icmp6 icmp6-type echoreq",
37        ])
38        ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6")
39        ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname)
40
41    def check_ping_reply(self, packet):
42        print(packet)
43        return False
44
45    @pytest.mark.require_user("root")
46    def test_dup_frag_hdr(self):
47        "Test packets with duplicate fragment headers"
48        srv_vnet = self.vnet_map["vnet2"]
49
50        # Import in the correct vnet, so at to not confuse Scapy
51        import scapy.all as sp
52
53        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
54            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
55            / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
56            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
57
58        # Delay the send so the sniffer is running when we transmit.
59        s = DelayedSend(packet)
60
61        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
62            timeout=3)
63        for p in packets:
64            assert not p.getlayer(sp.ICMPv6EchoReply)
65
66    @pytest.mark.require_user("root")
67    def test_overlong(self):
68        "Test overly long fragmented packet"
69
70        # Import in the correct vnet, so at to not confuse Scapy
71        import scapy.all as sp
72
73        curr = 0
74        pkts = []
75
76        frag_id = random.randint(0,0xffffffff)
77        gran = 1200
78
79        i = 0
80        while curr <= 65535:
81            ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2")
82            more = True
83            g = gran
84            if curr + gran > 65535:
85                more = False
86                g = 65530 - curr
87            if i == 0:
88                pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \
89                    sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
90            else:
91                pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
92            pkts.append(pkt)
93            curr += gran
94            i += 1
95
96        sp.send(pkts, inter = 0.1)
97
98class TestFrag6_Overlap(VnetTestTemplate):
99    REQUIRED_MODULES = ["pf"]
100    TOPOLOGY = {
101        "vnet1": {"ifaces": ["if1"]},
102        "vnet2": {"ifaces": ["if1"]},
103        "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
104    }
105
106    def vnet2_handler(self, vnet):
107        ToolsHelper.print_output("/sbin/pfctl -e")
108        ToolsHelper.print_output("/sbin/pfctl -x loud")
109        ToolsHelper.pf_rules([
110            "scrub fragment reassemble",
111            "pass",
112        ])
113
114    @pytest.mark.require_user("root")
115    def test_overlap(self):
116        "Ensure we discard packets with overlapping fragments"
117
118        # Import in the correct vnet, so at to not confuse Scapy
119        import scapy.all as sp
120
121        packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
122            / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
123        frags = sp.fragment6(packet, 128)
124        assert len(frags) == 3
125
126        f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
127        # Fragment with overlap
128        overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
129            / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
130            / sp.raw(bytes.fromhex('f00f') * 4)
131        frags = [ frags[0], frags[1], overlap, frags[2] ]
132
133        # Delay the send so the sniffer is running when we transmit.
134        s = DelayedSend(frags)
135
136        packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
137            timeout=3)
138        for p in packets:
139            p.show()
140            assert not p.getlayer(sp.ICMPv6EchoReply)
141