xref: /dpdk/examples/ipsec-secgw/test/trs_ipv6opts.py (revision 089e5ed727a15da2729cfee9b63533dd120bd04c)
1#!/usr/bin/env python3
2
3from scapy.all import *
4import unittest
5import pkttest
6
7
8SRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
9DST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
10SRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
11DST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
12
13
14def config():
15    return """
16sp ipv6 out esp protect 5 pri 1 \\
17src {0} \\
18dst {1} \\
19sport 0:65535 dport 0:65535
20
21sp ipv6 in esp protect 6 pri 1 \\
22src {1} \\
23dst {0} \\
24sport 0:65535 dport 0:65535
25
26sa out 5 cipher_algo null auth_algo null mode transport
27sa in 6 cipher_algo null auth_algo null mode transport
28
29rt ipv6 dst {0} port 1
30rt ipv6 dst {1} port 0
31""".format(SRC_NET, DST_NET)
32
33
34class TestTransportWithIPv6Ext(unittest.TestCase):
35    # There is a bug in the IPsec Scapy implementation
36    # which causes invalid packet reconstruction after
37    # successful decryption. This method is a workaround.
38    @staticmethod
39    def decrypt(pkt, sa):
40        esp = pkt[ESP]
41
42        # decrypt dummy packet with no extensions
43        d = sa.decrypt(IPv6()/esp)
44
45        # fix 'next header' in the preceding header of the original
46        # packet and remove ESP
47        pkt[ESP].underlayer.nh = d[IPv6].nh
48        pkt[ESP].underlayer.remove_payload()
49
50        # combine L3 header with decrypted payload
51        npkt = pkt/d[IPv6].payload
52
53        # fix length
54        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
55
56        return npkt
57
58    def setUp(self):
59        self.px = pkttest.PacketXfer()
60        self.outb_sa = SecurityAssociation(ESP, spi=5)
61        self.inb_sa = SecurityAssociation(ESP, spi=6)
62
63    def test_outb_ipv6_noopt(self):
64        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
65        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
66
67        # send and check response
68        resp = self.px.xfer_unprotected(pkt)
69        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
70        self.assertEqual(resp[ESP].spi, 5)
71
72        # decrypt response, check packet after decryption
73        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
74        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
75        self.assertEqual(d[UDP].sport, 123)
76        self.assertEqual(d[UDP].dport, 456)
77        self.assertEqual(bytes(d[UDP].payload), b'abc')
78
79    def test_outb_ipv6_opt(self):
80        hoptions = []
81        hoptions.append(RouterAlert(value=2))
82        hoptions.append(Jumbo(jumboplen=5000))
83        hoptions.append(Pad1())
84
85        doptions = []
86        doptions.append(HAO(hoa="1234::4321"))
87
88        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
89        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
90        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
91        pkt /= IPv6ExtHdrDestOpt(options=doptions)
92        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
93
94        # send and check response
95        resp = self.px.xfer_unprotected(pkt)
96        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
97
98        # check extensions
99        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
100        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
101        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
102
103        # check ESP
104        self.assertEqual(resp[ESP].spi, 5)
105
106        # decrypt response, check packet after decryption
107        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
108        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
109        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
110        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
111        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
112
113        # check UDP
114        self.assertEqual(d[UDP].sport, 123)
115        self.assertEqual(d[UDP].dport, 456)
116        self.assertEqual(bytes(d[UDP].payload), b'abc')
117
118    def test_inb_ipv6_noopt(self):
119        # encrypt and send raw UDP packet
120        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
121        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
122        e = self.inb_sa.encrypt(pkt)
123
124        # send and check response
125        resp = self.px.xfer_protected(e)
126        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
127
128        # check UDP packet
129        self.assertEqual(resp[UDP].sport, 123)
130        self.assertEqual(resp[UDP].dport, 456)
131        self.assertEqual(bytes(resp[UDP].payload), b'abc')
132
133    def test_inb_ipv6_opt(self):
134        hoptions = []
135        hoptions.append(RouterAlert(value=2))
136        hoptions.append(Jumbo(jumboplen=5000))
137        hoptions.append(Pad1())
138
139        doptions = []
140        doptions.append(HAO(hoa="1234::4321"))
141
142        # prepare packet with options
143        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
144        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
145        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
146        pkt /= IPv6ExtHdrDestOpt(options=doptions)
147        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
148        e = self.inb_sa.encrypt(pkt)
149
150        # self encrypted packet and check response
151        resp = self.px.xfer_protected(e)
152        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
153        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
154        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
155        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
156
157        # check UDP
158        self.assertEqual(resp[UDP].sport, 123)
159        self.assertEqual(resp[UDP].dport, 456)
160        self.assertEqual(bytes(resp[UDP].payload), b'abc')
161
162    def test_inb_ipv6_frag(self):
163        # prepare ESP payload
164        pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
165        e = self.inb_sa.encrypt(pkt)
166
167        # craft and send inbound packet
168        e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
169        resp = self.px.xfer_protected(e)
170
171        # check response
172        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
173        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
174
175        # check UDP
176        self.assertEqual(resp[UDP].sport, 123)
177        self.assertEqual(resp[UDP].dport, 456)
178        self.assertEqual(bytes(resp[UDP].payload), b'abc')
179
180
181pkttest.pkttest()
182