1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3 4from scapy.all import * 5import unittest 6import pkttest 7 8 9SRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001" 10DST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001" 11SRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64" 12DST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64" 13 14 15def config(): 16 return """ 17sp ipv6 out esp protect 5 pri 1 \\ 18src {0} \\ 19dst {1} \\ 20sport 0:65535 dport 0:65535 21 22sp ipv6 in esp protect 6 pri 1 \\ 23src {1} \\ 24dst {0} \\ 25sport 0:65535 dport 0:65535 26 27sa out 5 cipher_algo null auth_algo null mode transport 28sa in 6 cipher_algo null auth_algo null mode transport 29 30rt ipv6 dst {0} port 1 31rt ipv6 dst {1} port 0 32""".format(SRC_NET, DST_NET) 33 34 35class TestTransportWithIPv6Ext(unittest.TestCase): 36 # There is a bug in the IPsec Scapy implementation 37 # which causes invalid packet reconstruction after 38 # successful decryption. This method is a workaround. 39 @staticmethod 40 def decrypt(pkt, sa): 41 esp = pkt[ESP] 42 43 # decrypt dummy packet with no extensions 44 d = sa.decrypt(IPv6()/esp) 45 46 # fix 'next header' in the preceding header of the original 47 # packet and remove ESP 48 pkt[ESP].underlayer.nh = d[IPv6].nh 49 pkt[ESP].underlayer.remove_payload() 50 51 # combine L3 header with decrypted payload 52 npkt = pkt/d[IPv6].payload 53 54 # fix length 55 npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload) 56 57 return npkt 58 59 def setUp(self): 60 self.px = pkttest.PacketXfer() 61 self.outb_sa = SecurityAssociation(ESP, spi=5) 62 self.inb_sa = SecurityAssociation(ESP, spi=6) 63 64 def test_outb_ipv6_noopt(self): 65 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) 66 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 67 68 # send and check response 69 resp = self.px.xfer_unprotected(pkt) 70 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 71 self.assertEqual(resp[ESP].spi, 5) 72 73 # decrypt response, check packet after decryption 74 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) 75 self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP) 76 self.assertEqual(d[UDP].sport, 123) 77 self.assertEqual(d[UDP].dport, 456) 78 self.assertEqual(bytes(d[UDP].payload), b'abc') 79 80 def test_outb_ipv6_opt(self): 81 hoptions = [] 82 hoptions.append(RouterAlert(value=2)) 83 hoptions.append(Jumbo(jumboplen=5000)) 84 hoptions.append(Pad1()) 85 86 doptions = [] 87 doptions.append(HAO(hoa="1234::4321")) 88 89 pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) 90 pkt /= IPv6ExtHdrHopByHop(options=hoptions) 91 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) 92 pkt /= IPv6ExtHdrDestOpt(options=doptions) 93 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 94 95 # send and check response 96 resp = self.px.xfer_unprotected(pkt) 97 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) 98 99 # check extensions 100 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 101 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 102 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP) 103 104 # check ESP 105 self.assertEqual(resp[ESP].spi, 5) 106 107 # decrypt response, check packet after decryption 108 d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) 109 self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS) 110 self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 111 self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 112 self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) 113 114 # check UDP 115 self.assertEqual(d[UDP].sport, 123) 116 self.assertEqual(d[UDP].dport, 456) 117 self.assertEqual(bytes(d[UDP].payload), b'abc') 118 119 def test_inb_ipv6_noopt(self): 120 # encrypt and send raw UDP packet 121 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) 122 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 123 e = self.inb_sa.encrypt(pkt) 124 125 # send and check response 126 resp = self.px.xfer_protected(e) 127 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 128 129 # check UDP packet 130 self.assertEqual(resp[UDP].sport, 123) 131 self.assertEqual(resp[UDP].dport, 456) 132 self.assertEqual(bytes(resp[UDP].payload), b'abc') 133 134 def test_inb_ipv6_opt(self): 135 hoptions = [] 136 hoptions.append(RouterAlert(value=2)) 137 hoptions.append(Jumbo(jumboplen=5000)) 138 hoptions.append(Pad1()) 139 140 doptions = [] 141 doptions.append(HAO(hoa="1234::4321")) 142 143 # prepare packet with options 144 pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) 145 pkt /= IPv6ExtHdrHopByHop(options=hoptions) 146 pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) 147 pkt /= IPv6ExtHdrDestOpt(options=doptions) 148 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 149 e = self.inb_sa.encrypt(pkt) 150 151 # self encrypted packet and check response 152 resp = self.px.xfer_protected(e) 153 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) 154 self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 155 self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 156 self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) 157 158 # check UDP 159 self.assertEqual(resp[UDP].sport, 123) 160 self.assertEqual(resp[UDP].dport, 456) 161 self.assertEqual(bytes(resp[UDP].payload), b'abc') 162 163 def test_inb_ipv6_frag(self): 164 # prepare ESP payload 165 pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc") 166 e = self.inb_sa.encrypt(pkt) 167 168 # craft and send inbound packet 169 e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload 170 resp = self.px.xfer_protected(e) 171 172 # check response 173 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT) 174 self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP) 175 176 # check UDP 177 self.assertEqual(resp[UDP].sport, 123) 178 self.assertEqual(resp[UDP].dport, 456) 179 self.assertEqual(bytes(resp[UDP].payload), b'abc') 180 181 182pkttest.pkttest() 183