1#!/usr/bin/env python3 2 3from scapy.all import * 4import unittest 5import pkttest 6 7 8SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001" 9DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001" 10SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64" 11DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64" 12 13 14def config(): 15 return """ 16sp ipv6 out esp protect 5 pri 1 \\ 17src {0} \\ 18dst {1} \\ 19sport 0:65535 dport 0:65535 20 21sp ipv6 in esp protect 6 pri 1 \\ 22src {1} \\ 23dst {0} \\ 24sport 0:65535 dport 0:65535 25 26sa out 5 cipher_algo null auth_algo null mode transport 27sa in 6 cipher_algo null auth_algo null mode transport 28 29rt ipv6 dst {0} port 1 30rt ipv6 dst {1} port 0 31""".format(SRC_NET, DST_NET) 32 33 34class TestTransportWithIPv6Ext(unittest.TestCase): 35 # There is a bug in the IPsec Scapy implementation 36 # which causes invalid packet reconstruction after 37 # successful decryption. This method is a workaround. 38 @staticmethod 39 def decrypt(pkt, sa): 40 esp = pkt[ESP] 41 42 # decrypt dummy packet with no extensions 43 d = sa.decrypt(IPv6()/esp) 44 45 # fix 'next header' in the preceding header of the original 46 # packet and remove ESP 47 pkt[ESP].underlayer.nh = d[IPv6].nh 48 pkt[ESP].underlayer.remove_payload() 49 50 # combine L3 header with decrypted payload 51 npkt = pkt/d[IPv6].payload 52 53 # fix length 54 npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload) 55 56 return npkt 57 58 def setUp(self): 59 self.px = pkttest.PacketXfer() 60 self.outb_sa = SecurityAssociation(ESP, spi=5) 61 self.inb_sa = SecurityAssociation(ESP, spi=6) 62 63 def test_outb_ipv6_noopt(self): 64 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) 65 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 66 67 # send and check response 68 resp = self.px.xfer_unprotected(pkt) 69 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 70 self.assertEqual(resp[ESP].spi, 5) 71 72 # decrypt response, check packet after decryption 73 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) 74 self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP) 75 self.assertEqual(d[UDP].sport, 123) 76 self.assertEqual(d[UDP].dport, 456) 77 self.assertEqual(bytes(d[UDP].payload), b'abc') 78 79 def test_outb_ipv6_opt(self): 80 hoptions = [] 81 hoptions.append(RouterAlert(value=2)) 82 hoptions.append(Jumbo(jumboplen=5000)) 83 hoptions.append(Pad1()) 84 85 doptions = [] 86 doptions.append(HAO(hoa="1234::4321")) 87 88 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) 89 pkt /= IPv6ExtHdrHopByHop(options=hoptions) 90 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) 91 pkt /= IPv6ExtHdrDestOpt(options=doptions) 92 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 93 94 # send and check response 95 resp = self.px.xfer_unprotected(pkt) 96 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) 97 98 # check extensions 99 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 100 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 101 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP) 102 103 # check ESP 104 self.assertEqual(resp[ESP].spi, 5) 105 106 # decrypt response, check packet after decryption 107 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) 108 self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS) 109 self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 110 self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 111 self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) 112 113 # check UDP 114 self.assertEqual(d[UDP].sport, 123) 115 self.assertEqual(d[UDP].dport, 456) 116 self.assertEqual(bytes(d[UDP].payload), b'abc') 117 118 def test_inb_ipv6_noopt(self): 119 # encrypt and send raw UDP packet 120 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) 121 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 122 e = self.inb_sa.encrypt(pkt) 123 124 # send and check response 125 resp = self.px.xfer_protected(e) 126 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 127 128 # check UDP packet 129 self.assertEqual(resp[UDP].sport, 123) 130 self.assertEqual(resp[UDP].dport, 456) 131 self.assertEqual(bytes(resp[UDP].payload), b'abc') 132 133 def test_inb_ipv6_opt(self): 134 hoptions = [] 135 hoptions.append(RouterAlert(value=2)) 136 hoptions.append(Jumbo(jumboplen=5000)) 137 hoptions.append(Pad1()) 138 139 doptions = [] 140 doptions.append(HAO(hoa="1234::4321")) 141 142 # prepare packet with options 143 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) 144 pkt /= IPv6ExtHdrHopByHop(options=hoptions) 145 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) 146 pkt /= IPv6ExtHdrDestOpt(options=doptions) 147 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 148 e = self.inb_sa.encrypt(pkt) 149 150 # self encrypted packet and check response 151 resp = self.px.xfer_protected(e) 152 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) 153 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 154 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 155 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) 156 157 # check UDP 158 self.assertEqual(resp[UDP].sport, 123) 159 self.assertEqual(resp[UDP].dport, 456) 160 self.assertEqual(bytes(resp[UDP].payload), b'abc') 161 162 def test_inb_ipv6_frag(self): 163 # prepare ESP payload 164 pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc") 165 e = self.inb_sa.encrypt(pkt) 166 167 # craft and send inbound packet 168 e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload 169 resp = self.px.xfer_protected(e) 170 171 # check response 172 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT) 173 self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP) 174 175 # check UDP 176 self.assertEqual(resp[UDP].sport, 123) 177 self.assertEqual(resp[UDP].dport, 456) 178 self.assertEqual(bytes(resp[UDP].payload), b'abc') 179 180 181pkttest.pkttest() 182