xref: /dpdk/examples/ipsec-secgw/test/trs_ipv6opts.py (revision 1dc48bce518de1d8cb73f961404a47311c4a248d)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: BSD-3-Clause
3
4from scapy.all import *
5import unittest
6import pkttest
7
8
9SRC_ADDR  = "1111:0000:0000:0000:0000:0000:0000:0001"
10DST_ADDR  = "2222:0000:0000:0000:0000:0000:0000:0001"
11SRC_NET   = "1111:0000:0000:0000:0000:0000:0000:0000/64"
12DST_NET   = "2222:0000:0000:0000:0000:0000:0000:0000/64"
13
14
15def config():
16    return """
17sp ipv6 out esp protect 5 pri 1 \\
18src {0} \\
19dst {1} \\
20sport 0:65535 dport 0:65535
21
22sp ipv6 in esp protect 6 pri 1 \\
23src {1} \\
24dst {0} \\
25sport 0:65535 dport 0:65535
26
27sa out 5 cipher_algo null auth_algo null mode transport
28sa in 6 cipher_algo null auth_algo null mode transport
29
30rt ipv6 dst {0} port 1
31rt ipv6 dst {1} port 0
32""".format(SRC_NET, DST_NET)
33
34
35class TestTransportWithIPv6Ext(unittest.TestCase):
36    # There is a bug in the IPsec Scapy implementation
37    # which causes invalid packet reconstruction after
38    # successful decryption. This method is a workaround.
39    @staticmethod
40    def decrypt(pkt, sa):
41        esp = pkt[ESP]
42
43        # decrypt dummy packet with no extensions
44        d = sa.decrypt(IPv6()/esp)
45
46        # fix 'next header' in the preceding header of the original
47        # packet and remove ESP
48        pkt[ESP].underlayer.nh = d[IPv6].nh
49        pkt[ESP].underlayer.remove_payload()
50
51        # combine L3 header with decrypted payload
52        npkt = pkt/d[IPv6].payload
53
54        # fix length
55        npkt[IPv6].plen = d[IPv6].plen + len(pkt[IPv6].payload)
56
57        return npkt
58
59    def setUp(self):
60        self.px = pkttest.PacketXfer()
61        self.outb_sa = SecurityAssociation(ESP, spi=5)
62        self.inb_sa = SecurityAssociation(ESP, spi=6)
63
64    def test_outb_ipv6_noopt(self):
65        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
66        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
67
68        # send and check response
69        resp = self.px.xfer_unprotected(pkt)
70        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
71        self.assertEqual(resp[ESP].spi, 5)
72
73        # decrypt response, check packet after decryption
74        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
75        self.assertEqual(d[IPv6].nh, socket.IPPROTO_UDP)
76        self.assertEqual(d[UDP].sport, 123)
77        self.assertEqual(d[UDP].dport, 456)
78        self.assertEqual(bytes(d[UDP].payload), b'abc')
79
80    def test_outb_ipv6_opt(self):
81        hoptions = []
82        hoptions.append(RouterAlert(value=2))
83        hoptions.append(Jumbo(jumboplen=5000))
84        hoptions.append(Pad1())
85
86        doptions = []
87        doptions.append(HAO(hoa="1234::4321"))
88
89        pkt = IPv6(src=SRC_ADDR, dst=DST_ADDR)
90        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
91        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
92        pkt /= IPv6ExtHdrDestOpt(options=doptions)
93        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
94
95        # send and check response
96        resp = self.px.xfer_unprotected(pkt)
97        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
98
99        # check extensions
100        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
101        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
102        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_ESP)
103
104        # check ESP
105        self.assertEqual(resp[ESP].spi, 5)
106
107        # decrypt response, check packet after decryption
108        d = TestTransportWithIPv6Ext.decrypt(resp[IPv6], self.outb_sa)
109        self.assertEqual(d[IPv6].nh, socket.IPPROTO_HOPOPTS)
110        self.assertEqual(d[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
111        self.assertEqual(d[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
112        self.assertEqual(d[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
113
114        # check UDP
115        self.assertEqual(d[UDP].sport, 123)
116        self.assertEqual(d[UDP].dport, 456)
117        self.assertEqual(bytes(d[UDP].payload), b'abc')
118
119    def test_inb_ipv6_noopt(self):
120        # encrypt and send raw UDP packet
121        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
122        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
123        e = self.inb_sa.encrypt(pkt)
124
125        # send and check response
126        resp = self.px.xfer_protected(e)
127        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
128
129        # check UDP packet
130        self.assertEqual(resp[UDP].sport, 123)
131        self.assertEqual(resp[UDP].dport, 456)
132        self.assertEqual(bytes(resp[UDP].payload), b'abc')
133
134    def test_inb_ipv6_opt(self):
135        hoptions = []
136        hoptions.append(RouterAlert(value=2))
137        hoptions.append(Jumbo(jumboplen=5000))
138        hoptions.append(Pad1())
139
140        doptions = []
141        doptions.append(HAO(hoa="1234::4321"))
142
143        # prepare packet with options
144        pkt = IPv6(src=DST_ADDR, dst=SRC_ADDR)
145        pkt /= IPv6ExtHdrHopByHop(options=hoptions)
146        pkt /= IPv6ExtHdrRouting(addresses=["3333::3","4444::4"])
147        pkt /= IPv6ExtHdrDestOpt(options=doptions)
148        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
149        e = self.inb_sa.encrypt(pkt)
150
151        # self encrypted packet and check response
152        resp = self.px.xfer_protected(e)
153        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_HOPOPTS)
154        self.assertEqual(resp[IPv6ExtHdrHopByHop].nh, socket.IPPROTO_ROUTING)
155        self.assertEqual(resp[IPv6ExtHdrRouting].nh, socket.IPPROTO_DSTOPTS)
156        self.assertEqual(resp[IPv6ExtHdrDestOpt].nh, socket.IPPROTO_UDP)
157
158        # check UDP
159        self.assertEqual(resp[UDP].sport, 123)
160        self.assertEqual(resp[UDP].dport, 456)
161        self.assertEqual(bytes(resp[UDP].payload), b'abc')
162
163    def test_inb_ipv6_frag(self):
164        # prepare ESP payload
165        pkt = IPv6()/UDP(sport=123,dport=456)/Raw(load="abc")
166        e = self.inb_sa.encrypt(pkt)
167
168        # craft and send inbound packet
169        e = IPv6(src=DST_ADDR, dst=SRC_ADDR)/IPv6ExtHdrFragment()/e[IPv6].payload
170        resp = self.px.xfer_protected(e)
171
172        # check response
173        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_FRAGMENT)
174        self.assertEqual(resp[IPv6ExtHdrFragment].nh, socket.IPPROTO_UDP)
175
176        # check UDP
177        self.assertEqual(resp[UDP].sport, 123)
178        self.assertEqual(resp[UDP].dport, 456)
179        self.assertEqual(bytes(resp[UDP].payload), b'abc')
180
181
182pkttest.pkttest()
183