xref: /dpdk/dts/tests/TestSuite_checksum_offload.py (revision 8c9a7471a0e601b2f7a29a30daa4ec23cb484f77)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2024 University of New Hampshire
3
4"""DPDK checksum offload testing suite.
5
6This suite verifies L3/L4 checksum offload features of the Poll Mode Driver.
7On the Rx side, IPv4 and UDP/TCP checksum by hardware is checked to ensure
8checksum flags match expected flags. On the Tx side, IPv4/UDP, IPv4/TCP,
9IPv6/UDP, and IPv6/TCP insertion by hardware is checked to checksum flags
10match expected flags.
11
12"""
13
14from typing import List
15
16from scapy.layers.inet import IP, TCP, UDP  # type: ignore[import-untyped]
17from scapy.layers.inet6 import IPv6  # type: ignore[import-untyped]
18from scapy.layers.l2 import Dot1Q, Ether  # type: ignore[import-untyped]
19from scapy.layers.sctp import SCTP  # type: ignore[import-untyped]
20from scapy.packet import Packet, Raw  # type: ignore[import-untyped]
21
22from framework.params.testpmd import SimpleForwardingModes
23from framework.remote_session.testpmd_shell import (
24    ChecksumOffloadOptions,
25    PacketOffloadFlag,
26    TestPmdShell,
27)
28from framework.test_suite import TestSuite, func_test
29from framework.testbed_model.capability import NicCapability, TopologyType, requires
30
31
32@requires(topology_type=TopologyType.two_links)
33@requires(NicCapability.RX_OFFLOAD_CHECKSUM)
34@requires(NicCapability.RX_OFFLOAD_IPV4_CKSUM)
35@requires(NicCapability.RX_OFFLOAD_UDP_CKSUM)
36@requires(NicCapability.RX_OFFLOAD_TCP_CKSUM)
37class TestChecksumOffload(TestSuite):
38    """Checksum offload test suite.
39
40    This suite consists of 6 test cases:
41    1. Insert checksum on transmit packet
42    2. Do not insert checksum on transmit packet
43    3. Hardware checksum check L4 Rx
44    4. Hardware checksum check L3 Rx
45    5. Validate Rx checksum valid flags
46    6. Checksum offload with VLAN
47    7. Checksum offload with SCTP
48
49    """
50
51    def send_packets_and_verify(
52        self, packet_list: List[Packet], load: str, should_receive: bool
53    ) -> None:
54        """Iterates through a list of packets and verifies they are received.
55
56        Args:
57            packet_list: List of Scapy packets to send and verify.
58            load: Raw layer load attribute in the sent packet.
59            should_receive: Indicates whether the packet should be received
60                by the traffic generator.
61        """
62        for i in range(0, len(packet_list)):
63            received_packets = self.send_packet_and_capture(packet=packet_list[i])
64            received = any(
65                packet.haslayer(Raw) and load in str(packet.load) for packet in received_packets
66            )
67            self.verify(
68                received == should_receive,
69                f"Packet was {'dropped' if should_receive else 'received'}",
70            )
71
72    def send_packet_and_verify_checksum(
73        self, packet: Packet, goodL4: bool, goodIP: bool, testpmd: TestPmdShell, id: str
74    ) -> None:
75        """Send packet and verify verbose output matches expected output.
76
77        Args:
78            packet: Scapy packet to send to DUT.
79            goodL4: Verifies RTE_MBUF_F_RX_L4_CKSUM_GOOD in verbose output
80                if :data:`True`, or RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN if :data:`False`.
81            goodIP: Verifies RTE_MBUF_F_RX_IP_CKSUM_GOOD in verbose output
82                if :data:`True`, or RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN if :data:`False`.
83            testpmd: Testpmd shell session to analyze verbose output of.
84            id: The destination mac address that matches the sent packet in verbose output.
85        """
86        testpmd.start()
87        self.send_packet_and_capture(packet=packet)
88        verbose_output = testpmd.extract_verbose_output(testpmd.stop())
89        for packet in verbose_output:
90            if packet.dst_mac == id:
91                isIP = PacketOffloadFlag.RTE_MBUF_F_RX_IP_CKSUM_GOOD in packet.ol_flags
92                isL4 = PacketOffloadFlag.RTE_MBUF_F_RX_L4_CKSUM_GOOD in packet.ol_flags
93        self.verify(isL4 == goodL4, "Layer 4 checksum flag did not match expected checksum flag.")
94        self.verify(isIP == goodIP, "IP checksum flag did not match expected checksum flag.")
95
96    def setup_hw_offload(self, testpmd: TestPmdShell) -> None:
97        """Sets IP, UDP, and TCP layers to hardware offload.
98
99        Args:
100            testpmd: Testpmd shell to configure.
101        """
102        testpmd.stop_all_ports()
103        offloads = (
104            ChecksumOffloadOptions.ip | ChecksumOffloadOptions.udp | ChecksumOffloadOptions.tcp
105        )
106        testpmd.csum_set_hw(layers=offloads, port_id=0)
107        testpmd.start_all_ports()
108
109    @func_test
110    def test_insert_checksums(self) -> None:
111        """Enable checksum offload insertion and verify packet reception."""
112        mac_id = "00:00:00:00:00:01"
113        payload = "xxxxx"
114        packet_list = [
115            Ether(dst=mac_id) / IP() / UDP() / Raw(payload),
116            Ether(dst=mac_id) / IP() / TCP() / Raw(payload),
117            Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload),
118            Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload),
119        ]
120        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
121            testpmd.set_forward_mode(SimpleForwardingModes.csum)
122            testpmd.set_verbose(level=1)
123            self.setup_hw_offload(testpmd=testpmd)
124            testpmd.start()
125            self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
126            for i in range(0, len(packet_list)):
127                self.send_packet_and_verify_checksum(
128                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
129                )
130
131    @func_test
132    def test_no_insert_checksums(self) -> None:
133        """Disable checksum offload insertion and verify packet reception."""
134        mac_id = "00:00:00:00:00:01"
135        payload = "xxxxx"
136        packet_list = [
137            Ether(dst=mac_id) / IP() / UDP() / Raw(payload),
138            Ether(dst=mac_id) / IP() / TCP() / Raw(payload),
139            Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload),
140            Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload),
141        ]
142        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
143            testpmd.set_forward_mode(SimpleForwardingModes.csum)
144            testpmd.set_verbose(level=1)
145            testpmd.start()
146            self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
147            for i in range(0, len(packet_list)):
148                self.send_packet_and_verify_checksum(
149                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
150                )
151
152    @func_test
153    def test_l4_rx_checksum(self) -> None:
154        """Tests L4 Rx checksum in a variety of scenarios."""
155        mac_id = "00:00:00:00:00:01"
156        packet_list = [
157            Ether(dst=mac_id) / IP() / UDP(),
158            Ether(dst=mac_id) / IP() / TCP(),
159            Ether(dst=mac_id) / IP() / UDP(chksum=0xF),
160            Ether(dst=mac_id) / IP() / TCP(chksum=0xF),
161        ]
162        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
163            testpmd.set_forward_mode(SimpleForwardingModes.csum)
164            testpmd.set_verbose(level=1)
165            self.setup_hw_offload(testpmd=testpmd)
166            for i in range(0, 2):
167                self.send_packet_and_verify_checksum(
168                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
169                )
170            for i in range(2, 4):
171                self.send_packet_and_verify_checksum(
172                    packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
173                )
174
175    @func_test
176    def test_l3_rx_checksum(self) -> None:
177        """Tests L3 Rx checksum hardware offload."""
178        mac_id = "00:00:00:00:00:01"
179        packet_list = [
180            Ether(dst=mac_id) / IP() / UDP(),
181            Ether(dst=mac_id) / IP() / TCP(),
182            Ether(dst=mac_id) / IP(chksum=0xF) / UDP(),
183            Ether(dst=mac_id) / IP(chksum=0xF) / TCP(),
184        ]
185        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
186            testpmd.set_forward_mode(SimpleForwardingModes.csum)
187            testpmd.set_verbose(level=1)
188            self.setup_hw_offload(testpmd=testpmd)
189            for i in range(0, 2):
190                self.send_packet_and_verify_checksum(
191                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
192                )
193            for i in range(2, 4):
194                self.send_packet_and_verify_checksum(
195                    packet=packet_list[i], goodL4=True, goodIP=False, testpmd=testpmd, id=mac_id
196                )
197
198    @func_test
199    def test_validate_rx_checksum(self) -> None:
200        """Verify verbose output of Rx packets matches expected behavior."""
201        mac_id = "00:00:00:00:00:01"
202        packet_list = [
203            Ether(dst=mac_id) / IP() / UDP(),
204            Ether(dst=mac_id) / IP() / TCP(),
205            Ether(dst=mac_id) / IPv6(src="::1") / UDP(),
206            Ether(dst=mac_id) / IPv6(src="::1") / TCP(),
207            Ether(dst=mac_id) / IP(chksum=0x0) / UDP(chksum=0xF),
208            Ether(dst=mac_id) / IP(chksum=0x0) / TCP(chksum=0xF),
209            Ether(dst=mac_id) / IPv6(src="::1") / UDP(chksum=0xF),
210            Ether(dst=mac_id) / IPv6(src="::1") / TCP(chksum=0xF),
211        ]
212        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
213            testpmd.set_forward_mode(SimpleForwardingModes.csum)
214            testpmd.set_verbose(level=1)
215            self.setup_hw_offload(testpmd=testpmd)
216            for i in range(0, 4):
217                self.send_packet_and_verify_checksum(
218                    packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
219                )
220            for i in range(4, 6):
221                self.send_packet_and_verify_checksum(
222                    packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
223                )
224            for i in range(6, 8):
225                self.send_packet_and_verify_checksum(
226                    packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
227                )
228
229    @requires(NicCapability.RX_OFFLOAD_VLAN)
230    @func_test
231    def test_vlan_checksum(self) -> None:
232        """Test VLAN Rx checksum hardware offload and verify packet reception."""
233        mac_id = "00:00:00:00:00:01"
234        payload = "xxxxx"
235        packet_list = [
236            Ether(dst=mac_id) / Dot1Q(vlan=1) / IP(chksum=0x0) / UDP(chksum=0xF) / Raw(payload),
237            Ether(dst=mac_id) / Dot1Q(vlan=1) / IP(chksum=0x0) / TCP(chksum=0xF) / Raw(payload),
238            Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / UDP(chksum=0xF) / Raw(payload),
239            Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / TCP(chksum=0xF) / Raw(payload),
240        ]
241        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
242            testpmd.set_forward_mode(SimpleForwardingModes.csum)
243            testpmd.set_verbose(level=1)
244            self.setup_hw_offload(testpmd=testpmd)
245            testpmd.start()
246            self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True)
247            for i in range(0, 2):
248                self.send_packet_and_verify_checksum(
249                    packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id
250                )
251            for i in range(2, 4):
252                self.send_packet_and_verify_checksum(
253                    packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
254                )
255
256    @requires(NicCapability.RX_OFFLOAD_SCTP_CKSUM)
257    @func_test
258    def test_validate_sctp_checksum(self) -> None:
259        """Test SCTP Rx checksum hardware offload and verify packet reception."""
260        mac_id = "00:00:00:00:00:01"
261        packet_list = [
262            Ether(dst=mac_id) / IP() / SCTP(),
263            Ether(dst=mac_id) / IP() / SCTP(chksum=0xF),
264        ]
265        with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd:
266            testpmd.set_forward_mode(SimpleForwardingModes.csum)
267            testpmd.set_verbose(level=1)
268            testpmd.csum_set_hw(layers=ChecksumOffloadOptions.sctp)
269            testpmd.start()
270            self.send_packet_and_verify_checksum(
271                packet=packet_list[0], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id
272            )
273            self.send_packet_and_verify_checksum(
274                packet=packet_list[1], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id
275            )
276