1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2024 University of New Hampshire 3 4"""DPDK checksum offload testing suite. 5 6This suite verifies L3/L4 checksum offload features of the Poll Mode Driver. 7On the Rx side, IPv4 and UDP/TCP checksum by hardware is checked to ensure 8checksum flags match expected flags. On the Tx side, IPv4/UDP, IPv4/TCP, 9IPv6/UDP, and IPv6/TCP insertion by hardware is checked to checksum flags 10match expected flags. 11 12""" 13 14from typing import List 15 16from scapy.layers.inet import IP, TCP, UDP # type: ignore[import-untyped] 17from scapy.layers.inet6 import IPv6 # type: ignore[import-untyped] 18from scapy.layers.l2 import Dot1Q, Ether # type: ignore[import-untyped] 19from scapy.layers.sctp import SCTP # type: ignore[import-untyped] 20from scapy.packet import Packet, Raw # type: ignore[import-untyped] 21 22from framework.params.testpmd import SimpleForwardingModes 23from framework.remote_session.testpmd_shell import ( 24 ChecksumOffloadOptions, 25 PacketOffloadFlag, 26 TestPmdShell, 27) 28from framework.test_suite import TestSuite, func_test 29from framework.testbed_model.capability import NicCapability, TopologyType, requires 30 31 32@requires(topology_type=TopologyType.two_links) 33@requires(NicCapability.RX_OFFLOAD_CHECKSUM) 34@requires(NicCapability.RX_OFFLOAD_IPV4_CKSUM) 35@requires(NicCapability.RX_OFFLOAD_UDP_CKSUM) 36@requires(NicCapability.RX_OFFLOAD_TCP_CKSUM) 37class TestChecksumOffload(TestSuite): 38 """Checksum offload test suite. 39 40 This suite consists of 6 test cases: 41 1. Insert checksum on transmit packet 42 2. Do not insert checksum on transmit packet 43 3. Hardware checksum check L4 Rx 44 4. Hardware checksum check L3 Rx 45 5. Validate Rx checksum valid flags 46 6. Checksum offload with VLAN 47 7. Checksum offload with SCTP 48 49 """ 50 51 def send_packets_and_verify( 52 self, packet_list: List[Packet], load: str, should_receive: bool 53 ) -> None: 54 """Iterates through a list of packets and verifies they are received. 55 56 Args: 57 packet_list: List of Scapy packets to send and verify. 58 load: Raw layer load attribute in the sent packet. 59 should_receive: Indicates whether the packet should be received 60 by the traffic generator. 61 """ 62 for i in range(0, len(packet_list)): 63 received_packets = self.send_packet_and_capture(packet=packet_list[i]) 64 received = any( 65 packet.haslayer(Raw) and load in str(packet.load) for packet in received_packets 66 ) 67 self.verify( 68 received == should_receive, 69 f"Packet was {'dropped' if should_receive else 'received'}", 70 ) 71 72 def send_packet_and_verify_checksum( 73 self, packet: Packet, goodL4: bool, goodIP: bool, testpmd: TestPmdShell, id: str 74 ) -> None: 75 """Send packet and verify verbose output matches expected output. 76 77 Args: 78 packet: Scapy packet to send to DUT. 79 goodL4: Verifies RTE_MBUF_F_RX_L4_CKSUM_GOOD in verbose output 80 if :data:`True`, or RTE_MBUF_F_RX_L4_CKSUM_UNKNOWN if :data:`False`. 81 goodIP: Verifies RTE_MBUF_F_RX_IP_CKSUM_GOOD in verbose output 82 if :data:`True`, or RTE_MBUF_F_RX_IP_CKSUM_UNKNOWN if :data:`False`. 83 testpmd: Testpmd shell session to analyze verbose output of. 84 id: The destination mac address that matches the sent packet in verbose output. 85 """ 86 testpmd.start() 87 self.send_packet_and_capture(packet=packet) 88 verbose_output = testpmd.extract_verbose_output(testpmd.stop()) 89 for packet in verbose_output: 90 if packet.dst_mac == id: 91 isIP = PacketOffloadFlag.RTE_MBUF_F_RX_IP_CKSUM_GOOD in packet.ol_flags 92 isL4 = PacketOffloadFlag.RTE_MBUF_F_RX_L4_CKSUM_GOOD in packet.ol_flags 93 self.verify(isL4 == goodL4, "Layer 4 checksum flag did not match expected checksum flag.") 94 self.verify(isIP == goodIP, "IP checksum flag did not match expected checksum flag.") 95 96 def setup_hw_offload(self, testpmd: TestPmdShell) -> None: 97 """Sets IP, UDP, and TCP layers to hardware offload. 98 99 Args: 100 testpmd: Testpmd shell to configure. 101 """ 102 testpmd.stop_all_ports() 103 offloads = ( 104 ChecksumOffloadOptions.ip | ChecksumOffloadOptions.udp | ChecksumOffloadOptions.tcp 105 ) 106 testpmd.csum_set_hw(layers=offloads, port_id=0) 107 testpmd.start_all_ports() 108 109 @func_test 110 def test_insert_checksums(self) -> None: 111 """Enable checksum offload insertion and verify packet reception.""" 112 mac_id = "00:00:00:00:00:01" 113 payload = "xxxxx" 114 packet_list = [ 115 Ether(dst=mac_id) / IP() / UDP() / Raw(payload), 116 Ether(dst=mac_id) / IP() / TCP() / Raw(payload), 117 Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload), 118 Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload), 119 ] 120 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 121 testpmd.set_forward_mode(SimpleForwardingModes.csum) 122 testpmd.set_verbose(level=1) 123 self.setup_hw_offload(testpmd=testpmd) 124 testpmd.start() 125 self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True) 126 for i in range(0, len(packet_list)): 127 self.send_packet_and_verify_checksum( 128 packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id 129 ) 130 131 @func_test 132 def test_no_insert_checksums(self) -> None: 133 """Disable checksum offload insertion and verify packet reception.""" 134 mac_id = "00:00:00:00:00:01" 135 payload = "xxxxx" 136 packet_list = [ 137 Ether(dst=mac_id) / IP() / UDP() / Raw(payload), 138 Ether(dst=mac_id) / IP() / TCP() / Raw(payload), 139 Ether(dst=mac_id) / IPv6(src="::1") / UDP() / Raw(payload), 140 Ether(dst=mac_id) / IPv6(src="::1") / TCP() / Raw(payload), 141 ] 142 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 143 testpmd.set_forward_mode(SimpleForwardingModes.csum) 144 testpmd.set_verbose(level=1) 145 testpmd.start() 146 self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True) 147 for i in range(0, len(packet_list)): 148 self.send_packet_and_verify_checksum( 149 packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id 150 ) 151 152 @func_test 153 def test_l4_rx_checksum(self) -> None: 154 """Tests L4 Rx checksum in a variety of scenarios.""" 155 mac_id = "00:00:00:00:00:01" 156 packet_list = [ 157 Ether(dst=mac_id) / IP() / UDP(), 158 Ether(dst=mac_id) / IP() / TCP(), 159 Ether(dst=mac_id) / IP() / UDP(chksum=0xF), 160 Ether(dst=mac_id) / IP() / TCP(chksum=0xF), 161 ] 162 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 163 testpmd.set_forward_mode(SimpleForwardingModes.csum) 164 testpmd.set_verbose(level=1) 165 self.setup_hw_offload(testpmd=testpmd) 166 for i in range(0, 2): 167 self.send_packet_and_verify_checksum( 168 packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id 169 ) 170 for i in range(2, 4): 171 self.send_packet_and_verify_checksum( 172 packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id 173 ) 174 175 @func_test 176 def test_l3_rx_checksum(self) -> None: 177 """Tests L3 Rx checksum hardware offload.""" 178 mac_id = "00:00:00:00:00:01" 179 packet_list = [ 180 Ether(dst=mac_id) / IP() / UDP(), 181 Ether(dst=mac_id) / IP() / TCP(), 182 Ether(dst=mac_id) / IP(chksum=0xF) / UDP(), 183 Ether(dst=mac_id) / IP(chksum=0xF) / TCP(), 184 ] 185 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 186 testpmd.set_forward_mode(SimpleForwardingModes.csum) 187 testpmd.set_verbose(level=1) 188 self.setup_hw_offload(testpmd=testpmd) 189 for i in range(0, 2): 190 self.send_packet_and_verify_checksum( 191 packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id 192 ) 193 for i in range(2, 4): 194 self.send_packet_and_verify_checksum( 195 packet=packet_list[i], goodL4=True, goodIP=False, testpmd=testpmd, id=mac_id 196 ) 197 198 @func_test 199 def test_validate_rx_checksum(self) -> None: 200 """Verify verbose output of Rx packets matches expected behavior.""" 201 mac_id = "00:00:00:00:00:01" 202 packet_list = [ 203 Ether(dst=mac_id) / IP() / UDP(), 204 Ether(dst=mac_id) / IP() / TCP(), 205 Ether(dst=mac_id) / IPv6(src="::1") / UDP(), 206 Ether(dst=mac_id) / IPv6(src="::1") / TCP(), 207 Ether(dst=mac_id) / IP(chksum=0x0) / UDP(chksum=0xF), 208 Ether(dst=mac_id) / IP(chksum=0x0) / TCP(chksum=0xF), 209 Ether(dst=mac_id) / IPv6(src="::1") / UDP(chksum=0xF), 210 Ether(dst=mac_id) / IPv6(src="::1") / TCP(chksum=0xF), 211 ] 212 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 213 testpmd.set_forward_mode(SimpleForwardingModes.csum) 214 testpmd.set_verbose(level=1) 215 self.setup_hw_offload(testpmd=testpmd) 216 for i in range(0, 4): 217 self.send_packet_and_verify_checksum( 218 packet=packet_list[i], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id 219 ) 220 for i in range(4, 6): 221 self.send_packet_and_verify_checksum( 222 packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id 223 ) 224 for i in range(6, 8): 225 self.send_packet_and_verify_checksum( 226 packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id 227 ) 228 229 @requires(NicCapability.RX_OFFLOAD_VLAN) 230 @func_test 231 def test_vlan_checksum(self) -> None: 232 """Test VLAN Rx checksum hardware offload and verify packet reception.""" 233 mac_id = "00:00:00:00:00:01" 234 payload = "xxxxx" 235 packet_list = [ 236 Ether(dst=mac_id) / Dot1Q(vlan=1) / IP(chksum=0x0) / UDP(chksum=0xF) / Raw(payload), 237 Ether(dst=mac_id) / Dot1Q(vlan=1) / IP(chksum=0x0) / TCP(chksum=0xF) / Raw(payload), 238 Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / UDP(chksum=0xF) / Raw(payload), 239 Ether(dst=mac_id) / Dot1Q(vlan=1) / IPv6(src="::1") / TCP(chksum=0xF) / Raw(payload), 240 ] 241 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 242 testpmd.set_forward_mode(SimpleForwardingModes.csum) 243 testpmd.set_verbose(level=1) 244 self.setup_hw_offload(testpmd=testpmd) 245 testpmd.start() 246 self.send_packets_and_verify(packet_list=packet_list, load=payload, should_receive=True) 247 for i in range(0, 2): 248 self.send_packet_and_verify_checksum( 249 packet=packet_list[i], goodL4=False, goodIP=False, testpmd=testpmd, id=mac_id 250 ) 251 for i in range(2, 4): 252 self.send_packet_and_verify_checksum( 253 packet=packet_list[i], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id 254 ) 255 256 @requires(NicCapability.RX_OFFLOAD_SCTP_CKSUM) 257 @func_test 258 def test_validate_sctp_checksum(self) -> None: 259 """Test SCTP Rx checksum hardware offload and verify packet reception.""" 260 mac_id = "00:00:00:00:00:01" 261 packet_list = [ 262 Ether(dst=mac_id) / IP() / SCTP(), 263 Ether(dst=mac_id) / IP() / SCTP(chksum=0xF), 264 ] 265 with TestPmdShell(node=self.sut_node, enable_rx_cksum=True) as testpmd: 266 testpmd.set_forward_mode(SimpleForwardingModes.csum) 267 testpmd.set_verbose(level=1) 268 testpmd.csum_set_hw(layers=ChecksumOffloadOptions.sctp) 269 testpmd.start() 270 self.send_packet_and_verify_checksum( 271 packet=packet_list[0], goodL4=True, goodIP=True, testpmd=testpmd, id=mac_id 272 ) 273 self.send_packet_and_verify_checksum( 274 packet=packet_list[1], goodL4=False, goodIP=True, testpmd=testpmd, id=mac_id 275 ) 276