1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright(c) 2019 Intel Corporation 4 5from scapy.all import * 6import unittest 7import pkttest 8 9#{ipv4{ipv4}} test 10SRC_ADDR_IPV4_1 = "192.168.1.1" 11DST_ADDR_IPV4_1 = "192.168.2.1" 12 13#{ipv6{ipv6}} test 14SRC_ADDR_IPV6_1 = "1111:0000:0000:0000:0000:0000:0000:0001" 15DST_ADDR_IPV6_1 = "2222:0000:0000:0000:0000:0000:0000:0001" 16 17#{ipv4{ipv6}} test 18SRC_ADDR_IPV4_2 = "192.168.11.1" 19DST_ADDR_IPV4_2 = "192.168.12.1" 20SRC_ADDR_IPV6_2 = "1111:0000:0000:0000:0000:0000:0001:0001" 21DST_ADDR_IPV6_2 = "2222:0000:0000:0000:0000:0000:0001:0001" 22 23#{ipv6{ipv4}} test 24SRC_ADDR_IPV4_3 = "192.168.21.1" 25DST_ADDR_IPV4_3 = "192.168.22.1" 26SRC_ADDR_IPV6_3 = "1111:0000:0000:0000:0000:0001:0001:0001" 27DST_ADDR_IPV6_3 = "2222:0000:0000:0000:0000:0001:0001:0001" 28 29def config(): 30 return """ 31#outter-ipv4 inner-ipv4 tunnel mode test 32sp ipv4 out esp protect 5 pri 1 \\ 33src {0}/32 \\ 34dst {1}/32 \\ 35sport 0:65535 dport 0:65535 36 37sp ipv4 in esp protect 6 pri 1 \\ 38src {1}/32 \\ 39dst {0}/32 \\ 40sport 0:65535 dport 0:65535 41 42sa out 5 cipher_algo null auth_algo null mode ipv4-tunnel \\ 43src {0} dst {1} 44sa in 6 cipher_algo null auth_algo null mode ipv4-tunnel \\ 45src {1} dst {0} 46 47rt ipv4 dst {0}/32 port 1 48rt ipv4 dst {1}/32 port 0 49 50#outter-ipv6 inner-ipv6 tunnel mode test 51sp ipv6 out esp protect 7 pri 1 \\ 52src {2}/128 \\ 53dst {3}/128 \\ 54sport 0:65535 dport 0:65535 55 56sp ipv6 in esp protect 8 pri 1 \\ 57src {3}/128 \\ 58dst {2}/128 \\ 59sport 0:65535 dport 0:65535 60 61sa out 7 cipher_algo null auth_algo null mode ipv6-tunnel \\ 62src {2} dst {3} 63sa in 8 cipher_algo null auth_algo null mode ipv6-tunnel \\ 64src {3} dst {2} 65 66rt ipv6 dst {2}/128 port 1 67rt ipv6 dst {3}/128 port 0 68 69#outter-ipv4 inner-ipv6 tunnel mode test 70sp ipv6 out esp protect 9 pri 1 \\ 71src {4}/128 \\ 72dst {5}/128 \\ 73sport 0:65535 dport 0:65535 74 75sp ipv6 in esp protect 10 pri 1 \\ 76src {5}/128 \\ 77dst {4}/128 \\ 78sport 0:65535 dport 0:65535 79 80sa out 9 cipher_algo null auth_algo null mode ipv4-tunnel \\ 81src {6} dst {7} 82sa in 10 cipher_algo null auth_algo null mode ipv4-tunnel \\ 83src {7} dst {6} 84 85rt ipv6 dst {4}/128 port 1 86rt ipv4 dst {7}/32 port 0 87 88#outter-ipv6 inner-ipv4 tunnel mode test 89sp ipv4 out esp protect 11 pri 1 \\ 90src {8}/32 \\ 91dst {9}/32 \\ 92sport 0:65535 dport 0:65535 93 94sp ipv4 in esp protect 12 pri 1 \\ 95src {9}/32 \\ 96dst {8}/32 \\ 97sport 0:65535 dport 0:65535 98 99sa out 11 cipher_algo null auth_algo null mode ipv6-tunnel \\ 100src {10} dst {11} 101sa in 12 cipher_algo null auth_algo null mode ipv6-tunnel \\ 102src {11} dst {10} 103 104rt ipv4 dst {8}/32 port 1 105rt ipv6 dst {11}/128 port 0 106""".format(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, 107 SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, 108 SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, SRC_ADDR_IPV4_2, DST_ADDR_IPV4_2, 109 SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, SRC_ADDR_IPV6_3, DST_ADDR_IPV6_3) 110 111ECN_ECT0 = 0x02 112ECN_ECT1 = 0x01 113ECN_CE = 0x03 114DSCP_1 = 0x04 115DSCP_3F = 0xFC 116 117class TestTunnelHeaderReconstruct(unittest.TestCase): 118 def setUp(self): 119 self.px = pkttest.PacketXfer() 120 th = IP(src=DST_ADDR_IPV4_1, dst=SRC_ADDR_IPV4_1) 121 self.sa_ipv4v4 = SecurityAssociation(ESP, spi=6, tunnel_header = th) 122 123 th = IPv6(src=DST_ADDR_IPV6_1, dst=SRC_ADDR_IPV6_1) 124 self.sa_ipv6v6 = SecurityAssociation(ESP, spi=8, tunnel_header = th) 125 126 th = IP(src=DST_ADDR_IPV4_2, dst=SRC_ADDR_IPV4_2) 127 self.sa_ipv4v6 = SecurityAssociation(ESP, spi=10, tunnel_header = th) 128 129 th = IPv6(src=DST_ADDR_IPV6_3, dst=SRC_ADDR_IPV6_3) 130 self.sa_ipv6v4 = SecurityAssociation(ESP, spi=12, tunnel_header = th) 131 132 def gen_pkt_plain_ipv4(self, src, dst, tos): 133 pkt = IP(src=src, dst=dst, tos=tos) 134 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 135 return pkt 136 137 def gen_pkt_plain_ipv6(self, src, dst, tc): 138 pkt = IPv6(src=src, dst=dst, tc=tc) 139 pkt /= UDP(sport=123,dport=456)/Raw(load="abc") 140 return pkt 141 142 def gen_pkt_tun_ipv4v4(self, tos_outter, tos_inner): 143 pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_1, SRC_ADDR_IPV4_1, 144 tos_inner) 145 pkt = self.sa_ipv4v4.encrypt(pkt) 146 self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP) 147 self.assertEqual(pkt[ESP].spi, 6) 148 pkt[IP].tos = tos_outter 149 return pkt 150 151 def gen_pkt_tun_ipv6v6(self, tc_outter, tc_inner): 152 pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_1, SRC_ADDR_IPV6_1, 153 tc_inner) 154 pkt = self.sa_ipv6v6.encrypt(pkt) 155 self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP) 156 self.assertEqual(pkt[ESP].spi, 8) 157 pkt[IPv6].tc = tc_outter 158 return pkt 159 160 def gen_pkt_tun_ipv4v6(self, tos_outter, tc_inner): 161 pkt = self.gen_pkt_plain_ipv6(DST_ADDR_IPV6_2, SRC_ADDR_IPV6_2, 162 tc_inner) 163 pkt = self.sa_ipv4v6.encrypt(pkt) 164 self.assertEqual(pkt[IP].proto, socket.IPPROTO_ESP) 165 self.assertEqual(pkt[ESP].spi, 10) 166 pkt[IP].tos = tos_outter 167 return pkt 168 169 def gen_pkt_tun_ipv6v4(self, tc_outter, tos_inner): 170 pkt = self.gen_pkt_plain_ipv4(DST_ADDR_IPV4_3, SRC_ADDR_IPV4_3, 171 tos_inner) 172 pkt = self.sa_ipv6v4.encrypt(pkt) 173 self.assertEqual(pkt[IPv6].nh, socket.IPPROTO_ESP) 174 self.assertEqual(pkt[ESP].spi, 12) 175 pkt[IPv6].tc = tc_outter 176 return pkt 177 178#RFC4301 5.1.2.1 & 5.1.2.2, outbound packets shall be copied ECN field 179 def test_outb_ipv4v4_ecn(self): 180 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, 181 ECN_ECT1) 182 resp = self.px.xfer_unprotected(pkt) 183 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 184 self.assertEqual(resp[ESP].spi, 5) 185 self.assertEqual(resp[IP].tos, ECN_ECT1) 186 187 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, 188 ECN_ECT0) 189 resp = self.px.xfer_unprotected(pkt) 190 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 191 self.assertEqual(resp[ESP].spi, 5) 192 self.assertEqual(resp[IP].tos, ECN_ECT0) 193 194 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, 195 ECN_CE) 196 resp = self.px.xfer_unprotected(pkt) 197 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 198 self.assertEqual(resp[ESP].spi, 5) 199 self.assertEqual(resp[IP].tos, ECN_CE) 200 201 def test_outb_ipv6v6_ecn(self): 202 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, 203 ECN_ECT1) 204 resp = self.px.xfer_unprotected(pkt) 205 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 206 self.assertEqual(resp[IPv6].tc, ECN_ECT1) 207 208 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, 209 ECN_ECT0) 210 resp = self.px.xfer_unprotected(pkt) 211 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 212 self.assertEqual(resp[ESP].spi, 7) 213 self.assertEqual(resp[IPv6].tc, ECN_ECT0) 214 215 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, 216 ECN_CE) 217 resp = self.px.xfer_unprotected(pkt) 218 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 219 self.assertEqual(resp[ESP].spi, 7) 220 self.assertEqual(resp[IPv6].tc, ECN_CE) 221 222 def test_outb_ipv4v6_ecn(self): 223 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, 224 ECN_ECT1) 225 resp = self.px.xfer_unprotected(pkt) 226 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 227 self.assertEqual(resp[IP].tos, ECN_ECT1) 228 229 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, 230 ECN_ECT0) 231 resp = self.px.xfer_unprotected(pkt) 232 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 233 self.assertEqual(resp[IP].tos, ECN_ECT0) 234 235 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, 236 ECN_CE) 237 resp = self.px.xfer_unprotected(pkt) 238 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 239 self.assertEqual(resp[IP].tos, ECN_CE) 240 241 def test_outb_ipv6v4_ecn(self): 242 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, 243 ECN_ECT1) 244 resp = self.px.xfer_unprotected(pkt) 245 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 246 self.assertEqual(resp[IPv6].tc, ECN_ECT1) 247 248 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, 249 ECN_ECT0) 250 resp = self.px.xfer_unprotected(pkt) 251 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 252 self.assertEqual(resp[IPv6].tc, ECN_ECT0) 253 254 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, 255 ECN_CE) 256 resp = self.px.xfer_unprotected(pkt) 257 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 258 self.assertEqual(resp[IPv6].tc, ECN_CE) 259 260#RFC4301 5.1.2.1 & 5.1.2.2, if outbound packets ECN is CE (0x3), inbound packets 261#ECN is overwritten to CE, otherwise no change 262 263#Outter header not CE, Inner header should be no change 264 def test_inb_ipv4v4_ecn_inner_no_change(self): 265 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_ECT0) 266 resp = self.px.xfer_protected(pkt) 267 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 268 self.assertEqual(resp[IP].tos, ECN_ECT0) 269 270 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT0, ECN_ECT1) 271 resp = self.px.xfer_protected(pkt) 272 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 273 self.assertEqual(resp[IP].tos, ECN_ECT1) 274 275 pkt = self.gen_pkt_tun_ipv4v4(ECN_ECT1, ECN_CE) 276 resp = self.px.xfer_protected(pkt) 277 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 278 self.assertEqual(resp[IP].tos, ECN_CE) 279 280 def test_inb_ipv6v6_ecn_inner_no_change(self): 281 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_ECT0) 282 resp = self.px.xfer_protected(pkt) 283 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 284 self.assertEqual(resp[IPv6].tc, ECN_ECT0) 285 286 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT0, ECN_ECT1) 287 resp = self.px.xfer_protected(pkt) 288 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 289 self.assertEqual(resp[IPv6].tc, ECN_ECT1) 290 291 pkt = self.gen_pkt_tun_ipv6v6(ECN_ECT1, ECN_CE) 292 resp = self.px.xfer_protected(pkt) 293 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 294 self.assertEqual(resp[IPv6].tc, ECN_CE) 295 296 def test_inb_ipv4v6_ecn_inner_no_change(self): 297 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_ECT0) 298 resp = self.px.xfer_protected(pkt) 299 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 300 self.assertEqual(resp[IPv6].tc, ECN_ECT0) 301 302 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT0, ECN_ECT1) 303 resp = self.px.xfer_protected(pkt) 304 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 305 self.assertEqual(resp[IPv6].tc, ECN_ECT1) 306 307 pkt = self.gen_pkt_tun_ipv4v6(ECN_ECT1, ECN_CE) 308 resp = self.px.xfer_protected(pkt) 309 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 310 self.assertEqual(resp[IPv6].tc, ECN_CE) 311 312 def test_inb_ipv6v4_ecn_inner_no_change(self): 313 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_ECT0) 314 resp = self.px.xfer_protected(pkt) 315 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 316 self.assertEqual(resp[IP].tos, ECN_ECT0) 317 318 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT0, ECN_ECT1) 319 resp = self.px.xfer_protected(pkt) 320 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 321 self.assertEqual(resp[IP].tos, ECN_ECT1) 322 323 pkt = self.gen_pkt_tun_ipv6v4(ECN_ECT1, ECN_CE) 324 resp = self.px.xfer_protected(pkt) 325 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 326 self.assertEqual(resp[IP].tos, ECN_CE) 327 328#Outter header CE, Inner header should be changed to CE 329 def test_inb_ipv4v4_ecn_inner_change(self): 330 pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT0) 331 resp = self.px.xfer_protected(pkt) 332 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 333 self.assertEqual(resp[IP].tos, ECN_CE) 334 335 pkt = self.gen_pkt_tun_ipv4v4(ECN_CE, ECN_ECT1) 336 resp = self.px.xfer_protected(pkt) 337 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 338 self.assertEqual(resp[IP].tos, ECN_CE) 339 340 def test_inb_ipv6v6_ecn_inner_change(self): 341 pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT0) 342 resp = self.px.xfer_protected(pkt) 343 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 344 self.assertEqual(resp[IPv6].tc, ECN_CE) 345 346 pkt = self.gen_pkt_tun_ipv6v6(ECN_CE, ECN_ECT1) 347 resp = self.px.xfer_protected(pkt) 348 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 349 self.assertEqual(resp[IPv6].tc, ECN_CE) 350 351 def test_inb_ipv4v6_ecn_inner_change(self): 352 pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT0) 353 resp = self.px.xfer_protected(pkt) 354 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 355 self.assertEqual(resp[IPv6].tc, ECN_CE) 356 357 pkt = self.gen_pkt_tun_ipv4v6(ECN_CE, ECN_ECT1) 358 resp = self.px.xfer_protected(pkt) 359 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 360 self.assertEqual(resp[IPv6].tc, ECN_CE) 361 362 def test_inb_ipv6v4_ecn_inner_change(self): 363 pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT0) 364 resp = self.px.xfer_protected(pkt) 365 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 366 self.assertEqual(resp[IP].tos, ECN_CE) 367 368 pkt = self.gen_pkt_tun_ipv6v4(ECN_CE, ECN_ECT1) 369 resp = self.px.xfer_protected(pkt) 370 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 371 self.assertEqual(resp[IP].tos, ECN_CE) 372 373#RFC4301 5.1.2.1.5 Outer DS field should be copied from Inner DS field 374 def test_outb_ipv4v4_dscp(self): 375 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, 376 DSCP_1) 377 resp = self.px.xfer_unprotected(pkt) 378 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 379 self.assertEqual(resp[ESP].spi, 5) 380 self.assertEqual(resp[IP].tos, DSCP_1) 381 382 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_1, DST_ADDR_IPV4_1, 383 DSCP_3F) 384 resp = self.px.xfer_unprotected(pkt) 385 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 386 self.assertEqual(resp[ESP].spi, 5) 387 self.assertEqual(resp[IP].tos, DSCP_3F) 388 389 def test_outb_ipv6v6_dscp(self): 390 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, 391 DSCP_1) 392 resp = self.px.xfer_unprotected(pkt) 393 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 394 self.assertEqual(resp[ESP].spi, 7) 395 self.assertEqual(resp[IPv6].tc, DSCP_1) 396 397 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_1, DST_ADDR_IPV6_1, 398 DSCP_3F) 399 resp = self.px.xfer_unprotected(pkt) 400 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 401 self.assertEqual(resp[ESP].spi, 7) 402 self.assertEqual(resp[IPv6].tc, DSCP_3F) 403 404 def test_outb_ipv4v6_dscp(self): 405 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, 406 DSCP_1) 407 resp = self.px.xfer_unprotected(pkt) 408 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 409 self.assertEqual(resp[ESP].spi, 9) 410 self.assertEqual(resp[IP].tos, DSCP_1) 411 412 pkt = self.gen_pkt_plain_ipv6(SRC_ADDR_IPV6_2, DST_ADDR_IPV6_2, 413 DSCP_3F) 414 resp = self.px.xfer_unprotected(pkt) 415 self.assertEqual(resp[IP].proto, socket.IPPROTO_ESP) 416 self.assertEqual(resp[ESP].spi, 9) 417 self.assertEqual(resp[IP].tos, DSCP_3F) 418 419 def test_outb_ipv6v4_dscp(self): 420 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, 421 DSCP_1) 422 resp = self.px.xfer_unprotected(pkt) 423 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 424 self.assertEqual(resp[ESP].spi, 11) 425 self.assertEqual(resp[IPv6].tc, DSCP_1) 426 427 pkt = self.gen_pkt_plain_ipv4(SRC_ADDR_IPV4_3, DST_ADDR_IPV4_3, 428 DSCP_3F) 429 resp = self.px.xfer_unprotected(pkt) 430 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_ESP) 431 self.assertEqual(resp[ESP].spi, 11) 432 self.assertEqual(resp[IPv6].tc, DSCP_3F) 433 434#RFC4301 5.1.2.1.5 Inner DS field should not be affected by Outer DS field 435 def test_inb_ipv4v4_dscp(self): 436 pkt = self.gen_pkt_tun_ipv4v4(DSCP_3F, DSCP_1) 437 resp = self.px.xfer_protected(pkt) 438 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 439 self.assertEqual(resp[IP].tos, DSCP_1) 440 441 pkt = self.gen_pkt_tun_ipv4v4(DSCP_1, DSCP_3F) 442 resp = self.px.xfer_protected(pkt) 443 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 444 self.assertEqual(resp[IP].tos, DSCP_3F) 445 446 def test_inb_ipv6v6_dscp(self): 447 pkt = self.gen_pkt_tun_ipv6v6(DSCP_3F, DSCP_1) 448 resp = self.px.xfer_protected(pkt) 449 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 450 self.assertEqual(resp[IPv6].tc, DSCP_1) 451 452 pkt = self.gen_pkt_tun_ipv6v6(DSCP_1, DSCP_3F) 453 resp = self.px.xfer_protected(pkt) 454 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 455 self.assertEqual(resp[IPv6].tc, DSCP_3F) 456 457 def test_inb_ipv4v6_dscp(self): 458 pkt = self.gen_pkt_tun_ipv4v6(DSCP_3F, DSCP_1) 459 resp = self.px.xfer_protected(pkt) 460 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 461 self.assertEqual(resp[IPv6].tc, DSCP_1) 462 463 pkt = self.gen_pkt_tun_ipv4v6(DSCP_1, DSCP_3F) 464 resp = self.px.xfer_protected(pkt) 465 self.assertEqual(resp[IPv6].nh, socket.IPPROTO_UDP) 466 self.assertEqual(resp[IPv6].tc, DSCP_3F) 467 468 def test_inb_ipv6v4_dscp(self): 469 pkt = self.gen_pkt_tun_ipv6v4(DSCP_3F, DSCP_1) 470 resp = self.px.xfer_protected(pkt) 471 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 472 self.assertEqual(resp[IP].tos, DSCP_1) 473 474 pkt = self.gen_pkt_tun_ipv6v4(DSCP_1, DSCP_3F) 475 resp = self.px.xfer_protected(pkt) 476 self.assertEqual(resp[IP].proto, socket.IPPROTO_UDP) 477 self.assertEqual(resp[IP].tos, DSCP_3F) 478 479pkttest.pkttest() 480