xref: /dpdk/examples/ipsec-secgw/test/tun_null_header_reconstruct.py (revision f6feb39c20cec3b520eaaef2dde436379e48330a)
1#!/usr/bin/env python3
2# SPDX-License-Identifier: BSD-3-Clause
3# Copyright(c) 2019 Intel Corporation
4
5from scapy.all import *
6import unittest
7import pkttest
8
9#{ipv4{ipv4}} test
10SRC_ADDR_IPV4_1 = "192.168.1.1"
11DST_ADDR_IPV4_1 = "192.168.2.1"
12
13#{ipv6{ipv6}} test
14SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001"
15DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001"
16
17#{ipv4{ipv6}} test
18SRC_ADDR_IPV4_2 = "192.168.11.1"
19DST_ADDR_IPV4_2 = "192.168.12.1"
20SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001"
21DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001"
22
23#{ipv6{ipv4}} test
24SRC_ADDR_IPV4_3 = "192.168.21.1"
25DST_ADDR_IPV4_3 = "192.168.22.1"
26SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001"
27DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001"
28
29def config():
30    return """
31#outter-ipv4 inner-ipv4 tunnel mode test
32sp ipv4 out esp protect 5 pri 1 \\
33src {0}/32 \\
34dst {1}/32 \\
35sport 0:65535 dport 0:65535
36
37sp ipv4 in esp protect 6 pri 1 \\
38src {1}/32 \\
39dst {0}/32 \\
40sport 0:65535 dport 0:65535
41
42sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\
43src {0} dst {1}
44sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\
45src {1} dst {0}
46
47rt ipv4 dst {0}/32 port 1
48rt ipv4 dst {1}/32 port 0
49
50#outter-ipv6 inner-ipv6 tunnel mode test
51sp ipv6 out esp protect 7 pri 1 \\
52src {2}/128 \\
53dst {3}/128 \\
54sport 0:65535 dport 0:65535
55
56sp ipv6 in esp protect 8 pri 1 \\
57src {3}/128 \\
58dst {2}/128 \\
59sport 0:65535 dport 0:65535
60
61sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\
62src {2} dst {3}
63sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\
64src {3} dst {2}
65
66rt ipv6 dst {2}/128 port 1
67rt ipv6 dst {3}/128 port 0
68
69#outter-ipv4 inner-ipv6 tunnel mode test
70sp ipv6 out esp protect 9 pri 1 \\
71src {4}/128 \\
72dst {5}/128 \\
73sport 0:65535 dport 0:65535
74
75sp ipv6 in esp protect 10 pri 1 \\
76src {5}/128 \\
77dst {4}/128 \\
78sport 0:65535 dport 0:65535
79
80sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\
81src {6} dst {7}
82sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\
83src {7} dst {6}
84
85rt ipv6 dst {4}/128 port 1
86rt ipv4 dst {7}/32 port 0
87
88#outter-ipv6 inner-ipv4 tunnel mode test
89sp ipv4 out esp protect 11 pri 1 \\
90src {8}/32 \\
91dst {9}/32 \\
92sport 0:65535 dport 0:65535
93
94sp ipv4 in esp protect 12 pri 1 \\
95src {9}/32 \\
96dst {8}/32 \\
97sport 0:65535 dport 0:65535
98
99sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\
100src {10} dst {11}
101sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\
102src {11} dst {10}
103
104rt ipv4 dst {8}/32 port 1
105rt ipv6 dst {11}/128 port 0
106""".format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
107           SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
108           SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2,
109           SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3)
110
111ECN_ECT0    = 0x02
112ECN_ECT1    = 0x01
113ECN_CE      = 0x03
114DSCP_1      = 0x04
115DSCP_3F     = 0xFC
116
117class TestTunnelHeaderReconstruct(unittest.TestCase):
118    def setUp(self):
119        self.px = pkttest.PacketXfer()
120        th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1)
121        self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th)
122
123        th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1)
124        self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th)
125
126        th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2)
127        self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th)
128
129        th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3)
130        self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th)
131
132    def gen_pkt_plain_ipv4(self, src, dst, tos):
133        pkt = IP(src=src, dst=dst, tos=tos)
134        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
135        return pkt
136
137    def gen_pkt_plain_ipv6(self, src, dst, tc):
138        pkt = IPv6(src=src, dst=dst, tc=tc)
139        pkt /= UDP(sport=123,dport=456)/Raw(load="abc")
140        return pkt
141
142    def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner):
143        pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1,
144                                      tos_inner)
145        pkt = self.sa_ipv4v4.encrypt(pkt)
146        self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
147        self.assertEqual(pkt[ESP].spi, 6)
148        pkt[IP].tos = tos_outter
149        return pkt
150
151    def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner):
152        pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1,
153                                      tc_inner)
154        pkt = self.sa_ipv6v6.encrypt(pkt)
155        self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
156        self.assertEqual(pkt[ESP].spi, 8)
157        pkt[IPv6].tc = tc_outter
158        return pkt
159
160    def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner):
161        pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2,
162                                      tc_inner)
163        pkt = self.sa_ipv4v6.encrypt(pkt)
164        self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP)
165        self.assertEqual(pkt[ESP].spi, 10)
166        pkt[IP].tos = tos_outter
167        return pkt
168
169    def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner):
170        pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3,
171                                      tos_inner)
172        pkt = self.sa_ipv6v4.encrypt(pkt)
173        self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP)
174        self.assertEqual(pkt[ESP].spi, 12)
175        pkt[IPv6].tc = tc_outter
176        return pkt
177
178#RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field
179    def test_outb_ipv4v4_ecn(self):
180        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
181                                      ECN_ECT1)
182        resp = self.px.xfer_unprotected(pkt)
183        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
184        self.assertEqual(resp[ESP].spi, 5)
185        self.assertEqual(resp[IP].tos, ECN_ECT1)
186
187        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
188                                      ECN_ECT0)
189        resp = self.px.xfer_unprotected(pkt)
190        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
191        self.assertEqual(resp[ESP].spi, 5)
192        self.assertEqual(resp[IP].tos, ECN_ECT0)
193
194        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
195                                      ECN_CE)
196        resp = self.px.xfer_unprotected(pkt)
197        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
198        self.assertEqual(resp[ESP].spi, 5)
199        self.assertEqual(resp[IP].tos, ECN_CE)
200
201    def test_outb_ipv6v6_ecn(self):
202        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
203                                      ECN_ECT1)
204        resp = self.px.xfer_unprotected(pkt)
205        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
206        self.assertEqual(resp[IPv6].tc, ECN_ECT1)
207
208        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
209                                      ECN_ECT0)
210        resp = self.px.xfer_unprotected(pkt)
211        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
212        self.assertEqual(resp[ESP].spi, 7)
213        self.assertEqual(resp[IPv6].tc, ECN_ECT0)
214
215        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
216                                      ECN_CE)
217        resp = self.px.xfer_unprotected(pkt)
218        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
219        self.assertEqual(resp[ESP].spi, 7)
220        self.assertEqual(resp[IPv6].tc, ECN_CE)
221
222    def test_outb_ipv4v6_ecn(self):
223        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
224                                      ECN_ECT1)
225        resp = self.px.xfer_unprotected(pkt)
226        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
227        self.assertEqual(resp[IP].tos, ECN_ECT1)
228
229        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
230                                      ECN_ECT0)
231        resp = self.px.xfer_unprotected(pkt)
232        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
233        self.assertEqual(resp[IP].tos, ECN_ECT0)
234
235        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
236                                      ECN_CE)
237        resp = self.px.xfer_unprotected(pkt)
238        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
239        self.assertEqual(resp[IP].tos, ECN_CE)
240
241    def test_outb_ipv6v4_ecn(self):
242        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
243                                      ECN_ECT1)
244        resp = self.px.xfer_unprotected(pkt)
245        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
246        self.assertEqual(resp[IPv6].tc, ECN_ECT1)
247
248        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
249                                      ECN_ECT0)
250        resp = self.px.xfer_unprotected(pkt)
251        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
252        self.assertEqual(resp[IPv6].tc, ECN_ECT0)
253
254        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
255                                      ECN_CE)
256        resp = self.px.xfer_unprotected(pkt)
257        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
258        self.assertEqual(resp[IPv6].tc, ECN_CE)
259
260#RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets
261#ECN is overwritten to CE, otherwise no change
262
263#Outter header not CE, Inner header should be no change
264    def test_inb_ipv4v4_ecn_inner_no_change(self):
265        pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0)
266        resp = self.px.xfer_protected(pkt)
267        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
268        self.assertEqual(resp[IP].tos, ECN_ECT0)
269
270        pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1)
271        resp = self.px.xfer_protected(pkt)
272        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
273        self.assertEqual(resp[IP].tos, ECN_ECT1)
274
275        pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE)
276        resp = self.px.xfer_protected(pkt)
277        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
278        self.assertEqual(resp[IP].tos, ECN_CE)
279
280    def test_inb_ipv6v6_ecn_inner_no_change(self):
281        pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0)
282        resp = self.px.xfer_protected(pkt)
283        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
284        self.assertEqual(resp[IPv6].tc, ECN_ECT0)
285
286        pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1)
287        resp = self.px.xfer_protected(pkt)
288        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
289        self.assertEqual(resp[IPv6].tc, ECN_ECT1)
290
291        pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE)
292        resp = self.px.xfer_protected(pkt)
293        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
294        self.assertEqual(resp[IPv6].tc, ECN_CE)
295
296    def test_inb_ipv4v6_ecn_inner_no_change(self):
297        pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0)
298        resp = self.px.xfer_protected(pkt)
299        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
300        self.assertEqual(resp[IPv6].tc, ECN_ECT0)
301
302        pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1)
303        resp = self.px.xfer_protected(pkt)
304        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
305        self.assertEqual(resp[IPv6].tc, ECN_ECT1)
306
307        pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE)
308        resp = self.px.xfer_protected(pkt)
309        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
310        self.assertEqual(resp[IPv6].tc, ECN_CE)
311
312    def test_inb_ipv6v4_ecn_inner_no_change(self):
313        pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0)
314        resp = self.px.xfer_protected(pkt)
315        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
316        self.assertEqual(resp[IP].tos, ECN_ECT0)
317
318        pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1)
319        resp = self.px.xfer_protected(pkt)
320        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
321        self.assertEqual(resp[IP].tos, ECN_ECT1)
322
323        pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE)
324        resp = self.px.xfer_protected(pkt)
325        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
326        self.assertEqual(resp[IP].tos, ECN_CE)
327
328#Outter header CE, Inner header should be changed to CE
329    def test_inb_ipv4v4_ecn_inner_change(self):
330        pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0)
331        resp = self.px.xfer_protected(pkt)
332        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
333        self.assertEqual(resp[IP].tos, ECN_CE)
334
335        pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1)
336        resp = self.px.xfer_protected(pkt)
337        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
338        self.assertEqual(resp[IP].tos, ECN_CE)
339
340    def test_inb_ipv6v6_ecn_inner_change(self):
341        pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0)
342        resp = self.px.xfer_protected(pkt)
343        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
344        self.assertEqual(resp[IPv6].tc, ECN_CE)
345
346        pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1)
347        resp = self.px.xfer_protected(pkt)
348        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
349        self.assertEqual(resp[IPv6].tc, ECN_CE)
350
351    def test_inb_ipv4v6_ecn_inner_change(self):
352        pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0)
353        resp = self.px.xfer_protected(pkt)
354        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
355        self.assertEqual(resp[IPv6].tc, ECN_CE)
356
357        pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1)
358        resp = self.px.xfer_protected(pkt)
359        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
360        self.assertEqual(resp[IPv6].tc, ECN_CE)
361
362    def test_inb_ipv6v4_ecn_inner_change(self):
363        pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0)
364        resp = self.px.xfer_protected(pkt)
365        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
366        self.assertEqual(resp[IP].tos, ECN_CE)
367
368        pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1)
369        resp = self.px.xfer_protected(pkt)
370        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
371        self.assertEqual(resp[IP].tos, ECN_CE)
372
373#RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field
374    def test_outb_ipv4v4_dscp(self):
375        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
376                                      DSCP_1)
377        resp = self.px.xfer_unprotected(pkt)
378        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
379        self.assertEqual(resp[ESP].spi, 5)
380        self.assertEqual(resp[IP].tos, DSCP_1)
381
382        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1,
383                                      DSCP_3F)
384        resp = self.px.xfer_unprotected(pkt)
385        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
386        self.assertEqual(resp[ESP].spi, 5)
387        self.assertEqual(resp[IP].tos, DSCP_3F)
388
389    def test_outb_ipv6v6_dscp(self):
390        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
391                                      DSCP_1)
392        resp = self.px.xfer_unprotected(pkt)
393        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
394        self.assertEqual(resp[ESP].spi, 7)
395        self.assertEqual(resp[IPv6].tc, DSCP_1)
396
397        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1,
398                                      DSCP_3F)
399        resp = self.px.xfer_unprotected(pkt)
400        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
401        self.assertEqual(resp[ESP].spi, 7)
402        self.assertEqual(resp[IPv6].tc, DSCP_3F)
403
404    def test_outb_ipv4v6_dscp(self):
405        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
406                                      DSCP_1)
407        resp = self.px.xfer_unprotected(pkt)
408        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
409        self.assertEqual(resp[ESP].spi, 9)
410        self.assertEqual(resp[IP].tos, DSCP_1)
411
412        pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2,
413                                      DSCP_3F)
414        resp = self.px.xfer_unprotected(pkt)
415        self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP)
416        self.assertEqual(resp[ESP].spi, 9)
417        self.assertEqual(resp[IP].tos, DSCP_3F)
418
419    def test_outb_ipv6v4_dscp(self):
420        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
421                                      DSCP_1)
422        resp = self.px.xfer_unprotected(pkt)
423        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
424        self.assertEqual(resp[ESP].spi, 11)
425        self.assertEqual(resp[IPv6].tc, DSCP_1)
426
427        pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3,
428                                      DSCP_3F)
429        resp = self.px.xfer_unprotected(pkt)
430        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP)
431        self.assertEqual(resp[ESP].spi, 11)
432        self.assertEqual(resp[IPv6].tc, DSCP_3F)
433
434#RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field
435    def test_inb_ipv4v4_dscp(self):
436        pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1)
437        resp = self.px.xfer_protected(pkt)
438        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
439        self.assertEqual(resp[IP].tos, DSCP_1)
440
441        pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F)
442        resp = self.px.xfer_protected(pkt)
443        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
444        self.assertEqual(resp[IP].tos, DSCP_3F)
445
446    def test_inb_ipv6v6_dscp(self):
447        pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1)
448        resp = self.px.xfer_protected(pkt)
449        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
450        self.assertEqual(resp[IPv6].tc, DSCP_1)
451
452        pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F)
453        resp = self.px.xfer_protected(pkt)
454        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
455        self.assertEqual(resp[IPv6].tc, DSCP_3F)
456
457    def test_inb_ipv4v6_dscp(self):
458        pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1)
459        resp = self.px.xfer_protected(pkt)
460        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
461        self.assertEqual(resp[IPv6].tc, DSCP_1)
462
463        pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F)
464        resp = self.px.xfer_protected(pkt)
465        self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP)
466        self.assertEqual(resp[IPv6].tc, DSCP_3F)
467
468    def test_inb_ipv6v4_dscp(self):
469        pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1)
470        resp = self.px.xfer_protected(pkt)
471        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
472        self.assertEqual(resp[IP].tos, DSCP_1)
473
474        pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F)
475        resp = self.px.xfer_protected(pkt)
476        self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP)
477        self.assertEqual(resp[IP].tos, DSCP_3F)
478
479pkttest.pkttest()
480