19a18283aSMarcin Smoczynski#!/usr/bin/env python3 2*1dc48bceSStephen Hemminger# SPDX-License-Identifier: BSD-3-Clause 39a18283aSMarcin Smoczynski 49a18283aSMarcin Smoczynskifrom scapy.all import * 59a18283aSMarcin Smoczynskiimport unittest 69a18283aSMarcin Smoczynskiimport pkttest 79a18283aSMarcin Smoczynski 89a18283aSMarcin Smoczynski 99a18283aSMarcin SmoczynskiSRC_ADDR = "1111:0000:0000:0000:0000:0000:0000:0001" 109a18283aSMarcin SmoczynskiDST_ADDR = "2222:0000:0000:0000:0000:0000:0000:0001" 119a18283aSMarcin SmoczynskiSRC_NET = "1111:0000:0000:0000:0000:0000:0000:0000/64" 129a18283aSMarcin SmoczynskiDST_NET = "2222:0000:0000:0000:0000:0000:0000:0000/64" 139a18283aSMarcin Smoczynski 149a18283aSMarcin Smoczynski 159a18283aSMarcin Smoczynskidef config(): 169a18283aSMarcin Smoczynski return """ 179a18283aSMarcin Smoczynskisp ipv6 out esp protect 5 pri 1 \\ 189a18283aSMarcin Smoczynskisrc {0} \\ 199a18283aSMarcin Smoczynskidst {1} \\ 209a18283aSMarcin Smoczynskisport 0:65535 dport 0:65535 219a18283aSMarcin Smoczynski 229a18283aSMarcin Smoczynskisp ipv6 in esp protect 6 pri 1 \\ 239a18283aSMarcin Smoczynskisrc {1} \\ 249a18283aSMarcin Smoczynskidst {0} \\ 259a18283aSMarcin Smoczynskisport 0:65535 dport 0:65535 269a18283aSMarcin Smoczynski 279a18283aSMarcin Smoczynskisa out 5 cipher_algo null auth_algo null mode transport 289a18283aSMarcin Smoczynskisa in 6 cipher_algo null auth_algo null mode transport 299a18283aSMarcin Smoczynski 309a18283aSMarcin Smoczynskirt ipv6 dst {0} port 1 319a18283aSMarcin Smoczynskirt ipv6 dst {1} port 0 329a18283aSMarcin Smoczynski""".format(SRC_NET, DST_NET) 339a18283aSMarcin Smoczynski 349a18283aSMarcin Smoczynski 359a18283aSMarcin Smoczynskiclass TestTransportWithIPv6Ext(unittest.TestCase): 369a18283aSMarcin Smoczynski # There is a bug in the IPsec Scapy implementation 379a18283aSMarcin Smoczynski # which causes invalid packet reconstruction after 389a18283aSMarcin Smoczynski # successful decryption. This method is a workaround. 399a18283aSMarcin Smoczynski @staticmethod 409a18283aSMarcin Smoczynski def decrypt(pkt, sa): 419a18283aSMarcin Smoczynski esp = pkt[ESP] 429a18283aSMarcin Smoczynski 439a18283aSMarcin Smoczynski # decrypt dummy packet with no extensions 449a18283aSMarcin Smoczynski d = sa.decrypt(IPv6()/esp) 459a18283aSMarcin Smoczynski 469a18283aSMarcin Smoczynski # fix 'next header' in the preceding header of the original 479a18283aSMarcin Smoczynski # packet and remove ESP 489a18283aSMarcin Smoczynski pkt[ESP].underlayer.nh = d[IPv6].nh 499a18283aSMarcin Smoczynski pkt[ESP].underlayer.remove_payload() 509a18283aSMarcin Smoczynski 519a18283aSMarcin Smoczynski # combine L3 header with decrypted payload 529a18283aSMarcin Smoczynski npkt = pkt/d[IPv6].payload 539a18283aSMarcin Smoczynski 549a18283aSMarcin Smoczynski # fix length 559a18283aSMarcin Smoczynski npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload) 569a18283aSMarcin Smoczynski 579a18283aSMarcin Smoczynski return npkt 589a18283aSMarcin Smoczynski 599a18283aSMarcin Smoczynski def setUp(self): 609a18283aSMarcin Smoczynski self.px = pkttest.PacketXfer() 619a18283aSMarcin Smoczynski self.outb_sa = SecurityAssociation(ESP, spi=5) 629a18283aSMarcin Smoczynski self.inb_sa = SecurityAssociation(ESP, spi=6) 639a18283aSMarcin Smoczynski 649a18283aSMarcin Smoczynski def test_outb_ipv6_noopt(self): 659a18283aSMarcin Smoczynski pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) 669a18283aSMarcin Smoczynski pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 679a18283aSMarcin Smoczynski 689a18283aSMarcin Smoczynski # send and check response 699a18283aSMarcin Smoczynski resp = self.px.xfer_unprotected(pkt) 709a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 719a18283aSMarcin Smoczynski self.assertEqual(resp[ESP].spi, 5) 729a18283aSMarcin Smoczynski 739a18283aSMarcin Smoczynski # decrypt response, check packet after decryption 749a18283aSMarcin Smoczynski d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) 759a18283aSMarcin Smoczynski self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP) 769a18283aSMarcin Smoczynski self.assertEqual(d[UDP].sport, 123) 779a18283aSMarcin Smoczynski self.assertEqual(d[UDP].dport, 456) 789a18283aSMarcin Smoczynski self.assertEqual(bytes(d[UDP].payload), b'abc') 799a18283aSMarcin Smoczynski 809a18283aSMarcin Smoczynski def test_outb_ipv6_opt(self): 819a18283aSMarcin Smoczynski hoptions = [] 829a18283aSMarcin Smoczynski hoptions.append(RouterAlert(value=2)) 839a18283aSMarcin Smoczynski hoptions.append(Jumbo(jumboplen=5000)) 849a18283aSMarcin Smoczynski hoptions.append(Pad1()) 859a18283aSMarcin Smoczynski 869a18283aSMarcin Smoczynski doptions = [] 879a18283aSMarcin Smoczynski doptions.append(HAO(hoa="1234::4321")) 889a18283aSMarcin Smoczynski 899a18283aSMarcin Smoczynski pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR) 909a18283aSMarcin Smoczynski pkt /= IPv6ExtHdrHopByHop(options=hoptions) 919a18283aSMarcin Smoczynski pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) 929a18283aSMarcin Smoczynski pkt /= IPv6ExtHdrDestOpt(options=doptions) 939a18283aSMarcin Smoczynski pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 949a18283aSMarcin Smoczynski 959a18283aSMarcin Smoczynski # send and check response 969a18283aSMarcin Smoczynski resp = self.px.xfer_unprotected(pkt) 979a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) 989a18283aSMarcin Smoczynski 999a18283aSMarcin Smoczynski # check extensions 1009a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 1019a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 1029a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP) 1039a18283aSMarcin Smoczynski 1049a18283aSMarcin Smoczynski # check ESP 1059a18283aSMarcin Smoczynski self.assertEqual(resp[ESP].spi, 5) 1069a18283aSMarcin Smoczynski 1079a18283aSMarcin Smoczynski # decrypt response, check packet after decryption 1089a18283aSMarcin Smoczynski d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa) 1099a18283aSMarcin Smoczynski self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS) 1109a18283aSMarcin Smoczynski self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 1119a18283aSMarcin Smoczynski self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 1129a18283aSMarcin Smoczynski self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) 1139a18283aSMarcin Smoczynski 1149a18283aSMarcin Smoczynski # check UDP 1159a18283aSMarcin Smoczynski self.assertEqual(d[UDP].sport, 123) 1169a18283aSMarcin Smoczynski self.assertEqual(d[UDP].dport, 456) 1179a18283aSMarcin Smoczynski self.assertEqual(bytes(d[UDP].payload), b'abc') 1189a18283aSMarcin Smoczynski 1199a18283aSMarcin Smoczynski def test_inb_ipv6_noopt(self): 1209a18283aSMarcin Smoczynski # encrypt and send raw UDP packet 1219a18283aSMarcin Smoczynski pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) 1229a18283aSMarcin Smoczynski pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 1239a18283aSMarcin Smoczynski e = self.inb_sa.encrypt(pkt) 1249a18283aSMarcin Smoczynski 1259a18283aSMarcin Smoczynski # send and check response 1269a18283aSMarcin Smoczynski resp = self.px.xfer_protected(e) 1279a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 1289a18283aSMarcin Smoczynski 1299a18283aSMarcin Smoczynski # check UDP packet 1309a18283aSMarcin Smoczynski self.assertEqual(resp[UDP].sport, 123) 1319a18283aSMarcin Smoczynski self.assertEqual(resp[UDP].dport, 456) 1329a18283aSMarcin Smoczynski self.assertEqual(bytes(resp[UDP].payload), b'abc') 1339a18283aSMarcin Smoczynski 1349a18283aSMarcin Smoczynski def test_inb_ipv6_opt(self): 1359a18283aSMarcin Smoczynski hoptions = [] 1369a18283aSMarcin Smoczynski hoptions.append(RouterAlert(value=2)) 1379a18283aSMarcin Smoczynski hoptions.append(Jumbo(jumboplen=5000)) 1389a18283aSMarcin Smoczynski hoptions.append(Pad1()) 1399a18283aSMarcin Smoczynski 1409a18283aSMarcin Smoczynski doptions = [] 1419a18283aSMarcin Smoczynski doptions.append(HAO(hoa="1234::4321")) 1429a18283aSMarcin Smoczynski 1439a18283aSMarcin Smoczynski # prepare packet with options 1449a18283aSMarcin Smoczynski pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR) 1459a18283aSMarcin Smoczynski pkt /= IPv6ExtHdrHopByHop(options=hoptions) 1469a18283aSMarcin Smoczynski pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"]) 1479a18283aSMarcin Smoczynski pkt /= IPv6ExtHdrDestOpt(options=doptions) 1489a18283aSMarcin Smoczynski pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 1499a18283aSMarcin Smoczynski e = self.inb_sa.encrypt(pkt) 1509a18283aSMarcin Smoczynski 1519a18283aSMarcin Smoczynski # self encrypted packet and check response 1529a18283aSMarcin Smoczynski resp = self.px.xfer_protected(e) 1539a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS) 1549a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING) 1559a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS) 1569a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP) 1579a18283aSMarcin Smoczynski 1589a18283aSMarcin Smoczynski # check UDP 1599a18283aSMarcin Smoczynski self.assertEqual(resp[UDP].sport, 123) 1609a18283aSMarcin Smoczynski self.assertEqual(resp[UDP].dport, 456) 1619a18283aSMarcin Smoczynski self.assertEqual(bytes(resp[UDP].payload), b'abc') 1629a18283aSMarcin Smoczynski 1639a18283aSMarcin Smoczynski def test_inb_ipv6_frag(self): 1649a18283aSMarcin Smoczynski # prepare ESP payload 1659a18283aSMarcin Smoczynski pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc") 1669a18283aSMarcin Smoczynski e = self.inb_sa.encrypt(pkt) 1679a18283aSMarcin Smoczynski 1689a18283aSMarcin Smoczynski # craft and send inbound packet 1699a18283aSMarcin Smoczynski e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload 1709a18283aSMarcin Smoczynski resp = self.px.xfer_protected(e) 1719a18283aSMarcin Smoczynski 1729a18283aSMarcin Smoczynski # check response 1739a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT) 1749a18283aSMarcin Smoczynski self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP) 1759a18283aSMarcin Smoczynski 1769a18283aSMarcin Smoczynski # check UDP 1779a18283aSMarcin Smoczynski self.assertEqual(resp[UDP].sport, 123) 1789a18283aSMarcin Smoczynski self.assertEqual(resp[UDP].dport, 456) 1799a18283aSMarcin Smoczynski self.assertEqual(bytes(resp[UDP].payload), b'abc') 1809a18283aSMarcin Smoczynski 1819a18283aSMarcin Smoczynski 1829a18283aSMarcin Smoczynskipkttest.pkttest() 183