1import pytest 2import logging 3import threading 4import time 5import random 6logging.getLogger("scapy").setLevel(logging.CRITICAL) 7from atf_python.sys.net.tools import ToolsHelper 8from atf_python.sys.net.vnet import VnetTestTemplate 9 10class DelayedSend(threading.Thread): 11 def __init__(self, packet): 12 threading.Thread.__init__(self) 13 self._packet = packet 14 15 self.start() 16 17 def run(self): 18 import scapy.all as sp 19 time.sleep(1) 20 sp.send(self._packet) 21 22class TestFrag6(VnetTestTemplate): 23 REQUIRED_MODULES = ["pf", "dummymbuf"] 24 TOPOLOGY = { 25 "vnet1": {"ifaces": ["if1"]}, 26 "vnet2": {"ifaces": ["if1"]}, 27 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 28 } 29 30 def vnet2_handler(self, vnet): 31 ifname = vnet.iface_alias_map["if1"].name 32 ToolsHelper.print_output("/sbin/pfctl -e") 33 ToolsHelper.pf_rules([ 34 "scrub fragment reassemble min-ttl 10", 35 "pass", 36 "block in inet6 proto icmp6 icmp6-type echoreq", 37 ]) 38 ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6") 39 ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname) 40 41 def check_ping_reply(self, packet): 42 print(packet) 43 return False 44 45 @pytest.mark.require_user("root") 46 def test_dup_frag_hdr(self): 47 "Test packets with duplicate fragment headers" 48 srv_vnet = self.vnet_map["vnet2"] 49 50 # Import in the correct vnet, so at to not confuse Scapy 51 import scapy.all as sp 52 53 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 54 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 55 / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ 56 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) 57 58 # Delay the send so the sniffer is running when we transmit. 59 s = DelayedSend(packet) 60 61 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 62 timeout=3) 63 for p in packets: 64 assert not p.getlayer(sp.ICMPv6EchoReply) 65 66 @pytest.mark.require_user("root") 67 def test_overlong(self): 68 "Test overly long fragmented packet" 69 70 # Import in the correct vnet, so at to not confuse Scapy 71 import scapy.all as sp 72 73 curr = 0 74 pkts = [] 75 76 frag_id = random.randint(0,0xffffffff) 77 gran = 1200 78 79 i = 0 80 while curr <= 65535: 81 ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") 82 more = True 83 g = gran 84 if curr + gran > 65535: 85 more = False 86 g = 65530 - curr 87 if i == 0: 88 pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \ 89 sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 90 else: 91 pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g) 92 pkts.append(pkt) 93 curr += gran 94 i += 1 95 96 sp.send(pkts, inter = 0.1) 97 98class TestFrag6_Overlap(VnetTestTemplate): 99 REQUIRED_MODULES = ["pf"] 100 TOPOLOGY = { 101 "vnet1": {"ifaces": ["if1"]}, 102 "vnet2": {"ifaces": ["if1"]}, 103 "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, 104 } 105 106 def vnet2_handler(self, vnet): 107 ToolsHelper.print_output("/sbin/pfctl -e") 108 ToolsHelper.print_output("/sbin/pfctl -x loud") 109 ToolsHelper.pf_rules([ 110 "scrub fragment reassemble", 111 "pass", 112 ]) 113 114 @pytest.mark.require_user("root") 115 def test_overlap(self): 116 "Ensure we discard packets with overlapping fragments" 117 118 # Import in the correct vnet, so at to not confuse Scapy 119 import scapy.all as sp 120 121 packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 122 / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90)) 123 frags = sp.fragment6(packet, 128) 124 assert len(frags) == 3 125 126 f = frags[0].getlayer(sp.IPv6ExtHdrFragment) 127 # Fragment with overlap 128 overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ 129 / sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \ 130 / sp.raw(bytes.fromhex('f00f') * 4) 131 frags = [ frags[0], frags[1], overlap, frags[2] ] 132 133 # Delay the send so the sniffer is running when we transmit. 134 s = DelayedSend(frags) 135 136 packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, 137 timeout=3) 138 for p in packets: 139 p.show() 140 assert not p.getlayer(sp.ICMPv6EchoReply) 141