xref: /dpdk/examples/ipsec-secgw/test/trs_ipv6opts.py (revision 1dc48bce518de1d8cb73f961404a47311c4a248d)
19a18283aSMarcin Smoczynski#!/usr/bin/env python3
2*1dc48bceSStephen Hemminger# SPDX-License-Identifier: BSD-3-Clause
39a18283aSMarcin Smoczynski
49a18283aSMarcin Smoczynskifrom scapy.all import *
59a18283aSMarcin Smoczynskiimport unittest
69a18283aSMarcin Smoczynskiimport pkttest
79a18283aSMarcin Smoczynski
89a18283aSMarcin Smoczynski
99a18283aSMarcin SmoczynskiSRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
109a18283aSMarcin SmoczynskiDST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
119a18283aSMarcin SmoczynskiSRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
129a18283aSMarcin SmoczynskiDST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
139a18283aSMarcin Smoczynski
149a18283aSMarcin Smoczynski
159a18283aSMarcin Smoczynskidef config():
169a18283aSMarcin Smoczynski    return """
179a18283aSMarcin Smoczynskisp ipv6 out esp protect 5 pri 1 \\
189a18283aSMarcin Smoczynskisrc {0} \\
199a18283aSMarcin Smoczynskidst {1} \\
209a18283aSMarcin Smoczynskisport 0:65535 dport 0:65535
219a18283aSMarcin Smoczynski
229a18283aSMarcin Smoczynskisp ipv6 in esp protect 6 pri 1 \\
239a18283aSMarcin Smoczynskisrc {1} \\
249a18283aSMarcin Smoczynskidst {0} \\
259a18283aSMarcin Smoczynskisport 0:65535 dport 0:65535
269a18283aSMarcin Smoczynski
279a18283aSMarcin Smoczynskisa out 5 cipher_algo null auth_algo null mode transport
289a18283aSMarcin Smoczynskisa in 6 cipher_algo null auth_algo null mode transport
299a18283aSMarcin Smoczynski
309a18283aSMarcin Smoczynskirt ipv6 dst {0} port 1
319a18283aSMarcin Smoczynskirt ipv6 dst {1} port 0
329a18283aSMarcin Smoczynski""".format(SRC_NET, DST_NET)
339a18283aSMarcin Smoczynski
349a18283aSMarcin Smoczynski
359a18283aSMarcin Smoczynskiclass TestTransportWithIPv6Ext(unittest.TestCase):
369a18283aSMarcin Smoczynski    # There is a bug in the IPsec Scapy implementation
379a18283aSMarcin Smoczynski    # which causes invalid packet reconstruction after
389a18283aSMarcin Smoczynski    # successful decryption. This method is a workaround.
399a18283aSMarcin Smoczynski    @staticmethod
409a18283aSMarcin Smoczynski    def decrypt(pkt, sa):
419a18283aSMarcin Smoczynski        esp = pkt[ESP]
429a18283aSMarcin Smoczynski
439a18283aSMarcin Smoczynski        # decrypt dummy packet with no extensions
449a18283aSMarcin Smoczynski        d = sa.decrypt(IPv6()/esp)
459a18283aSMarcin Smoczynski
469a18283aSMarcin Smoczynski        # fix 'next header' in the preceding header of the original
479a18283aSMarcin Smoczynski        # packet and remove ESP
489a18283aSMarcin Smoczynski        pkt[ESP].underlayer.nh = d[IPv6].nh
499a18283aSMarcin Smoczynski        pkt[ESP].underlayer.remove_payload()
509a18283aSMarcin Smoczynski
519a18283aSMarcin Smoczynski        # combine L3 header with decrypted payload
529a18283aSMarcin Smoczynski        npkt = pkt/d[IPv6].payload
539a18283aSMarcin Smoczynski
549a18283aSMarcin Smoczynski        # fix length
559a18283aSMarcin Smoczynski        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
569a18283aSMarcin Smoczynski
579a18283aSMarcin Smoczynski        return npkt
589a18283aSMarcin Smoczynski
599a18283aSMarcin Smoczynski    def setUp(self):
609a18283aSMarcin Smoczynski        self.px = pkttest.PacketXfer()
619a18283aSMarcin Smoczynski        self.outb_sa = SecurityAssociation(ESP, spi=5)
629a18283aSMarcin Smoczynski        self.inb_sa = SecurityAssociation(ESP, spi=6)
639a18283aSMarcin Smoczynski
649a18283aSMarcin Smoczynski    def test_outb_ipv6_noopt(self):
659a18283aSMarcin Smoczynski        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
669a18283aSMarcin Smoczynski        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
679a18283aSMarcin Smoczynski
689a18283aSMarcin Smoczynski        # send and check response
699a18283aSMarcin Smoczynski        resp = self.px.xfer_unprotected(pkt)
709a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
719a18283aSMarcin Smoczynski        self.assertEqual(resp[ESP].spi, 5)
729a18283aSMarcin Smoczynski
739a18283aSMarcin Smoczynski        # decrypt response, check packet after decryption
749a18283aSMarcin Smoczynski        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
759a18283aSMarcin Smoczynski        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
769a18283aSMarcin Smoczynski        self.assertEqual(d[UDP].sport, 123)
779a18283aSMarcin Smoczynski        self.assertEqual(d[UDP].dport, 456)
789a18283aSMarcin Smoczynski        self.assertEqual(bytes(d[UDP].payload), b'abc')
799a18283aSMarcin Smoczynski
809a18283aSMarcin Smoczynski    def test_outb_ipv6_opt(self):
819a18283aSMarcin Smoczynski        hoptions = []
829a18283aSMarcin Smoczynski        hoptions.append(RouterAlert(value=2))
839a18283aSMarcin Smoczynski        hoptions.append(Jumbo(jumboplen=5000))
849a18283aSMarcin Smoczynski        hoptions.append(Pad1())
859a18283aSMarcin Smoczynski
869a18283aSMarcin Smoczynski        doptions = []
879a18283aSMarcin Smoczynski        doptions.append(HAO(hoa="1234::4321"))
889a18283aSMarcin Smoczynski
899a18283aSMarcin Smoczynski        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
909a18283aSMarcin Smoczynski        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
919a18283aSMarcin Smoczynski        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
929a18283aSMarcin Smoczynski        pkt /= IPv6ExtHdrDestOpt(options=doptions)
939a18283aSMarcin Smoczynski        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
949a18283aSMarcin Smoczynski
959a18283aSMarcin Smoczynski        # send and check response
969a18283aSMarcin Smoczynski        resp = self.px.xfer_unprotected(pkt)
979a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
989a18283aSMarcin Smoczynski
999a18283aSMarcin Smoczynski        # check extensions
1009a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
1019a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
1029a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
1039a18283aSMarcin Smoczynski
1049a18283aSMarcin Smoczynski        # check ESP
1059a18283aSMarcin Smoczynski        self.assertEqual(resp[ESP].spi, 5)
1069a18283aSMarcin Smoczynski
1079a18283aSMarcin Smoczynski        # decrypt response, check packet after decryption
1089a18283aSMarcin Smoczynski        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
1099a18283aSMarcin Smoczynski        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
1109a18283aSMarcin Smoczynski        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
1119a18283aSMarcin Smoczynski        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
1129a18283aSMarcin Smoczynski        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
1139a18283aSMarcin Smoczynski
1149a18283aSMarcin Smoczynski        # check UDP
1159a18283aSMarcin Smoczynski        self.assertEqual(d[UDP].sport, 123)
1169a18283aSMarcin Smoczynski        self.assertEqual(d[UDP].dport, 456)
1179a18283aSMarcin Smoczynski        self.assertEqual(bytes(d[UDP].payload), b'abc')
1189a18283aSMarcin Smoczynski
1199a18283aSMarcin Smoczynski    def test_inb_ipv6_noopt(self):
1209a18283aSMarcin Smoczynski        # encrypt and send raw UDP packet
1219a18283aSMarcin Smoczynski        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
1229a18283aSMarcin Smoczynski        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
1239a18283aSMarcin Smoczynski        e = self.inb_sa.encrypt(pkt)
1249a18283aSMarcin Smoczynski
1259a18283aSMarcin Smoczynski        # send and check response
1269a18283aSMarcin Smoczynski        resp = self.px.xfer_protected(e)
1279a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
1289a18283aSMarcin Smoczynski
1299a18283aSMarcin Smoczynski        # check UDP packet
1309a18283aSMarcin Smoczynski        self.assertEqual(resp[UDP].sport, 123)
1319a18283aSMarcin Smoczynski        self.assertEqual(resp[UDP].dport, 456)
1329a18283aSMarcin Smoczynski        self.assertEqual(bytes(resp[UDP].payload), b'abc')
1339a18283aSMarcin Smoczynski
1349a18283aSMarcin Smoczynski    def test_inb_ipv6_opt(self):
1359a18283aSMarcin Smoczynski        hoptions = []
1369a18283aSMarcin Smoczynski        hoptions.append(RouterAlert(value=2))
1379a18283aSMarcin Smoczynski        hoptions.append(Jumbo(jumboplen=5000))
1389a18283aSMarcin Smoczynski        hoptions.append(Pad1())
1399a18283aSMarcin Smoczynski
1409a18283aSMarcin Smoczynski        doptions = []
1419a18283aSMarcin Smoczynski        doptions.append(HAO(hoa="1234::4321"))
1429a18283aSMarcin Smoczynski
1439a18283aSMarcin Smoczynski        # prepare packet with options
1449a18283aSMarcin Smoczynski        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
1459a18283aSMarcin Smoczynski        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
1469a18283aSMarcin Smoczynski        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
1479a18283aSMarcin Smoczynski        pkt /= IPv6ExtHdrDestOpt(options=doptions)
1489a18283aSMarcin Smoczynski        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
1499a18283aSMarcin Smoczynski        e = self.inb_sa.encrypt(pkt)
1509a18283aSMarcin Smoczynski
1519a18283aSMarcin Smoczynski        # self encrypted packet and check response
1529a18283aSMarcin Smoczynski        resp = self.px.xfer_protected(e)
1539a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
1549a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
1559a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
1569a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
1579a18283aSMarcin Smoczynski
1589a18283aSMarcin Smoczynski        # check UDP
1599a18283aSMarcin Smoczynski        self.assertEqual(resp[UDP].sport, 123)
1609a18283aSMarcin Smoczynski        self.assertEqual(resp[UDP].dport, 456)
1619a18283aSMarcin Smoczynski        self.assertEqual(bytes(resp[UDP].payload), b'abc')
1629a18283aSMarcin Smoczynski
1639a18283aSMarcin Smoczynski    def test_inb_ipv6_frag(self):
1649a18283aSMarcin Smoczynski        # prepare ESP payload
1659a18283aSMarcin Smoczynski        pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
1669a18283aSMarcin Smoczynski        e = self.inb_sa.encrypt(pkt)
1679a18283aSMarcin Smoczynski
1689a18283aSMarcin Smoczynski        # craft and send inbound packet
1699a18283aSMarcin Smoczynski        e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
1709a18283aSMarcin Smoczynski        resp = self.px.xfer_protected(e)
1719a18283aSMarcin Smoczynski
1729a18283aSMarcin Smoczynski        # check response
1739a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
1749a18283aSMarcin Smoczynski        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
1759a18283aSMarcin Smoczynski
1769a18283aSMarcin Smoczynski        # check UDP
1779a18283aSMarcin Smoczynski        self.assertEqual(resp[UDP].sport, 123)
1789a18283aSMarcin Smoczynski        self.assertEqual(resp[UDP].dport, 456)
1799a18283aSMarcin Smoczynski        self.assertEqual(bytes(resp[UDP].payload), b'abc')
1809a18283aSMarcin Smoczynski
1819a18283aSMarcin Smoczynski
1829a18283aSMarcin Smoczynskipkttest.pkttest()
183