1b23dbabbSKristof Provostimport pytest 2b23dbabbSKristof Provostimport logging 3b23dbabbSKristof Provostimport threading 4b23dbabbSKristof Provostimport time 5*7f846fc0SKristof Provostimport random 6b23dbabbSKristof Provostlogging.getLogger("scapy").setLevel(logging.CRITICAL) 7b23dbabbSKristof Provostfrom atf_python.sys.net.tools import ToolsHelper 8b23dbabbSKristof Provostfrom atf_python.sys.net.vnet import VnetTestTemplate 9b23dbabbSKristof Provost 10b23dbabbSKristof Provostclass DelayedSend(threading.Thread): 11b23dbabbSKristof Provost def __init__(self, packet): 12b23dbabbSKristof Provost threading.Thread.__init__(self) 13b23dbabbSKristof Provost self._packet = packet 14b23dbabbSKristof Provost 15b23dbabbSKristof Provost self.start() 16b23dbabbSKristof Provost 17b23dbabbSKristof Provost def run(self): 18b23dbabbSKristof Provost import scapy.all as sp 19b23dbabbSKristof Provost time.sleep(1) 20b23dbabbSKristof Provost sp.send(self._packet) 21b23dbabbSKristof Provost 22b23dbabbSKristof Provostclass TestFrag6(VnetTestTemplate): 23*7f846fc0SKristof Provost REQUIRED_MODULES = ["pf", "dummymbuf"] 24b23dbabbSKristof Provost TOPOLOGY = { 25b23dbabbSKristof Provost "vnet1": {"ifaces": ["if1"]}, 26b23dbabbSKristof Provost "vnet2": {"ifaces": ["if1"]}, 27b23dbabbSKristof Provost "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 28b23dbabbSKristof Provost } 29b23dbabbSKristof Provost 30b23dbabbSKristof Provost def vnet2_handler(self, vnet): 31*7f846fc0SKristof Provost ifname = vnet.iface_alias_map["if1"].name 32b23dbabbSKristof Provost ToolsHelper.print_output("/sbin/pfctl -e") 33b23dbabbSKristof Provost ToolsHelper.pf_rules([ 34*7f846fc0SKristof Provost "scrub fragment reassemble min-ttl 10", 35b23dbabbSKristof Provost "pass", 36b23dbabbSKristof Provost "block in inet6 proto icmp6 icmp6-type echoreq", 37b23dbabbSKristof Provost ]) 38*7f846fc0SKristof Provost ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6") 39*7f846fc0SKristof Provost ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname) 40b23dbabbSKristof Provost 41b23dbabbSKristof Provost def check_ping_reply(self, packet): 42b23dbabbSKristof Provost print(packet) 43b23dbabbSKristof Provost return False 44b23dbabbSKristof Provost 45b23dbabbSKristof Provost @pytest.mark.require_user("root") 46b23dbabbSKristof Provost def test_dup_frag_hdr(self): 47b23dbabbSKristof Provost "Test packets with duplicate fragment headers" 48b23dbabbSKristof Provost srv_vnet = self.vnet_map["vnet2"] 49b23dbabbSKristof Provost 50b23dbabbSKristof Provost # Import in the correct vnet, so at to not confuse Scapy 51b23dbabbSKristof Provost import scapy.all as sp 52b23dbabbSKristof Provost 53b23dbabbSKristof Provost packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 54b23dbabbSKristof Provost / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 55b23dbabbSKristof Provost / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 56b23dbabbSKristof Provost / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) 57b23dbabbSKristof Provost 58b23dbabbSKristof Provost # Delay the send so the sniffer is running when we transmit. 59b23dbabbSKristof Provost s = DelayedSend(packet) 60b23dbabbSKristof Provost 61b23dbabbSKristof Provost packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 62b23dbabbSKristof Provost timeout=3) 63b23dbabbSKristof Provost for p in packets: 64b23dbabbSKristof Provost assert not p.getlayer(sp.ICMPv6EchoReply) 656a3266f7SKristof Provost 66*7f846fc0SKristof Provost @pytest.mark.require_user("root") 67*7f846fc0SKristof Provost def test_overlong(self): 68*7f846fc0SKristof Provost "Test overly long fragmented packet" 69*7f846fc0SKristof Provost 70*7f846fc0SKristof Provost # Import in the correct vnet, so at to not confuse Scapy 71*7f846fc0SKristof Provost import scapy.all as sp 72*7f846fc0SKristof Provost 73*7f846fc0SKristof Provost curr = 0 74*7f846fc0SKristof Provost pkts = [] 75*7f846fc0SKristof Provost 76*7f846fc0SKristof Provost frag_id = random.randint(0,0xffffffff) 77*7f846fc0SKristof Provost gran = 1200 78*7f846fc0SKristof Provost 79*7f846fc0SKristof Provost i = 0 80*7f846fc0SKristof Provost while curr <= 65535: 81*7f846fc0SKristof Provost ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") 82*7f846fc0SKristof Provost more = True 83*7f846fc0SKristof Provost g = gran 84*7f846fc0SKristof Provost if curr + gran > 65535: 85*7f846fc0SKristof Provost more = False 86*7f846fc0SKristof Provost g = 65530 - curr 87*7f846fc0SKristof Provost if i == 0: 88*7f846fc0SKristof Provost pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \ 89*7f846fc0SKristof Provost sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 90*7f846fc0SKristof Provost else: 91*7f846fc0SKristof Provost pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 92*7f846fc0SKristof Provost pkts.append(pkt) 93*7f846fc0SKristof Provost curr += gran 94*7f846fc0SKristof Provost i += 1 95*7f846fc0SKristof Provost 96*7f846fc0SKristof Provost sp.send(pkts, inter = 0.1) 97*7f846fc0SKristof Provost 986a3266f7SKristof Provostclass TestFrag6_Overlap(VnetTestTemplate): 996a3266f7SKristof Provost REQUIRED_MODULES = ["pf"] 1006a3266f7SKristof Provost TOPOLOGY = { 1016a3266f7SKristof Provost "vnet1": {"ifaces": ["if1"]}, 1026a3266f7SKristof Provost "vnet2": {"ifaces": ["if1"]}, 1036a3266f7SKristof Provost "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 1046a3266f7SKristof Provost } 1056a3266f7SKristof Provost 1066a3266f7SKristof Provost def vnet2_handler(self, vnet): 1076a3266f7SKristof Provost ToolsHelper.print_output("/sbin/pfctl -e") 1086a3266f7SKristof Provost ToolsHelper.print_output("/sbin/pfctl -x loud") 1096a3266f7SKristof Provost ToolsHelper.pf_rules([ 1106a3266f7SKristof Provost "scrub fragment reassemble", 1116a3266f7SKristof Provost "pass", 1126a3266f7SKristof Provost ]) 1136a3266f7SKristof Provost 1146a3266f7SKristof Provost @pytest.mark.require_user("root") 1156a3266f7SKristof Provost def test_overlap(self): 1166a3266f7SKristof Provost "Ensure we discard packets with overlapping fragments" 1176a3266f7SKristof Provost 1186a3266f7SKristof Provost # Import in the correct vnet, so at to not confuse Scapy 1196a3266f7SKristof Provost import scapy.all as sp 1206a3266f7SKristof Provost 1216a3266f7SKristof Provost packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 1226a3266f7SKristof Provost / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90)) 1236a3266f7SKristof Provost frags = sp.fragment6(packet, 128) 1246a3266f7SKristof Provost assert len(frags) == 3 1256a3266f7SKristof Provost 1266a3266f7SKristof Provost f = frags[0].getlayer(sp.IPv6ExtHdrFragment) 1276a3266f7SKristof Provost # Fragment with overlap 1286a3266f7SKristof Provost overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 1296a3266f7SKristof Provost / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \ 1306a3266f7SKristof Provost / sp.raw(bytes.fromhex('f00f') * 4) 1316a3266f7SKristof Provost frags = [ frags[0], frags[1], overlap, frags[2] ] 1326a3266f7SKristof Provost 1336a3266f7SKristof Provost # Delay the send so the sniffer is running when we transmit. 1346a3266f7SKristof Provost s = DelayedSend(frags) 1356a3266f7SKristof Provost 1366a3266f7SKristof Provost packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 1376a3266f7SKristof Provost timeout=3) 1386a3266f7SKristof Provost for p in packets: 1396a3266f7SKristof Provost p.show() 1406a3266f7SKristof Provost assert not p.getlayer(sp.ICMPv6EchoReply) 141