1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2010-2014 Intel Corporation 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4 5"""Features common to all test suites. 6 7The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such 8must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics 9needed by subclasses: 10 11 * Testbed (SUT, TG) configuration, 12 * Packet sending and verification, 13 * Test case verification. 14""" 15 16from ipaddress import IPv4Interface, IPv6Interface, ip_interface 17from typing import ClassVar, Union 18 19from scapy.layers.inet import IP # type: ignore[import-untyped] 20from scapy.layers.l2 import Ether # type: ignore[import-untyped] 21from scapy.packet import Packet, Padding # type: ignore[import-untyped] 22 23from framework.testbed_model.port import Port, PortLink 24from framework.testbed_model.sut_node import SutNode 25from framework.testbed_model.tg_node import TGNode 26from framework.testbed_model.traffic_generator.capturing_traffic_generator import ( 27 PacketFilteringConfig, 28) 29 30from .exception import TestCaseVerifyError 31from .logger import DTSLogger, get_dts_logger 32from .utils import get_packet_summaries 33 34 35class TestSuite: 36 """The base class with building blocks needed by most test cases. 37 38 * Test suite setup/cleanup methods to override, 39 * Test case setup/cleanup methods to override, 40 * Test case verification, 41 * Testbed configuration, 42 * Traffic sending and verification. 43 44 Test cases are implemented by subclasses. Test cases are all methods starting with ``test_``, 45 further divided into performance test cases (starting with ``test_perf_``) 46 and functional test cases (all other test cases). 47 48 By default, all test cases will be executed. A list of testcase names may be specified 49 in the YAML test run configuration file and in the :option:`--test-suite` command line argument 50 or in the :envvar:`DTS_TESTCASES` environment variable to filter which test cases to run. 51 The union of both lists will be used. Any unknown test cases from the latter lists 52 will be silently ignored. 53 54 The methods named ``[set_up|tear_down]_[suite|test_case]`` should be overridden in subclasses 55 if the appropriate test suite/test case fixtures are needed. 56 57 The test suite is aware of the testbed (the SUT and TG) it's running on. From this, it can 58 properly choose the IP addresses and other configuration that must be tailored to the testbed. 59 60 Attributes: 61 sut_node: The SUT node where the test suite is running. 62 tg_node: The TG node where the test suite is running. 63 """ 64 65 sut_node: SutNode 66 tg_node: TGNode 67 #: Whether the test suite is blocking. A failure of a blocking test suite 68 #: will block the execution of all subsequent test suites in the current build target. 69 is_blocking: ClassVar[bool] = False 70 _logger: DTSLogger 71 _port_links: list[PortLink] 72 _sut_port_ingress: Port 73 _sut_port_egress: Port 74 _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface] 75 _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface] 76 _tg_port_ingress: Port 77 _tg_port_egress: Port 78 _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface] 79 _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface] 80 81 def __init__( 82 self, 83 sut_node: SutNode, 84 tg_node: TGNode, 85 ): 86 """Initialize the test suite testbed information and basic configuration. 87 88 Find links between ports and set up default IP addresses to be used when 89 configuring them. 90 91 Args: 92 sut_node: The SUT node where the test suite will run. 93 tg_node: The TG node where the test suite will run. 94 """ 95 self.sut_node = sut_node 96 self.tg_node = tg_node 97 self._logger = get_dts_logger(self.__class__.__name__) 98 self._port_links = [] 99 self._process_links() 100 self._sut_port_ingress, self._tg_port_egress = ( 101 self._port_links[0].sut_port, 102 self._port_links[0].tg_port, 103 ) 104 self._sut_port_egress, self._tg_port_ingress = ( 105 self._port_links[1].sut_port, 106 self._port_links[1].tg_port, 107 ) 108 self._sut_ip_address_ingress = ip_interface("192.168.100.2/24") 109 self._sut_ip_address_egress = ip_interface("192.168.101.2/24") 110 self._tg_ip_address_egress = ip_interface("192.168.100.3/24") 111 self._tg_ip_address_ingress = ip_interface("192.168.101.3/24") 112 113 def _process_links(self) -> None: 114 """Construct links between SUT and TG ports.""" 115 for sut_port in self.sut_node.ports: 116 for tg_port in self.tg_node.ports: 117 if (sut_port.identifier, sut_port.peer) == ( 118 tg_port.peer, 119 tg_port.identifier, 120 ): 121 self._port_links.append(PortLink(sut_port=sut_port, tg_port=tg_port)) 122 123 def set_up_suite(self) -> None: 124 """Set up test fixtures common to all test cases. 125 126 This is done before any test case has been run. 127 """ 128 129 def tear_down_suite(self) -> None: 130 """Tear down the previously created test fixtures common to all test cases. 131 132 This is done after all test have been run. 133 """ 134 135 def set_up_test_case(self) -> None: 136 """Set up test fixtures before each test case. 137 138 This is done before *each* test case. 139 """ 140 141 def tear_down_test_case(self) -> None: 142 """Tear down the previously created test fixtures after each test case. 143 144 This is done after *each* test case. 145 """ 146 147 def configure_testbed_ipv4(self, restore: bool = False) -> None: 148 """Configure IPv4 addresses on all testbed ports. 149 150 The configured ports are: 151 152 * SUT ingress port, 153 * SUT egress port, 154 * TG ingress port, 155 * TG egress port. 156 157 Args: 158 restore: If :data:`True`, will remove the configuration instead. 159 """ 160 delete = True if restore else False 161 enable = False if restore else True 162 self._configure_ipv4_forwarding(enable) 163 self.sut_node.configure_port_ip_address( 164 self._sut_ip_address_egress, self._sut_port_egress, delete 165 ) 166 self.sut_node.configure_port_state(self._sut_port_egress, enable) 167 self.sut_node.configure_port_ip_address( 168 self._sut_ip_address_ingress, self._sut_port_ingress, delete 169 ) 170 self.sut_node.configure_port_state(self._sut_port_ingress, enable) 171 self.tg_node.configure_port_ip_address( 172 self._tg_ip_address_ingress, self._tg_port_ingress, delete 173 ) 174 self.tg_node.configure_port_state(self._tg_port_ingress, enable) 175 self.tg_node.configure_port_ip_address( 176 self._tg_ip_address_egress, self._tg_port_egress, delete 177 ) 178 self.tg_node.configure_port_state(self._tg_port_egress, enable) 179 180 def _configure_ipv4_forwarding(self, enable: bool) -> None: 181 self.sut_node.configure_ipv4_forwarding(enable) 182 183 def send_packet_and_capture( 184 self, 185 packet: Packet, 186 filter_config: PacketFilteringConfig = PacketFilteringConfig(), 187 duration: float = 1, 188 ) -> list[Packet]: 189 """Send and receive `packet` using the associated TG. 190 191 Send `packet` through the appropriate interface and receive on the appropriate interface. 192 Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic. 193 194 Args: 195 packet: The packet to send. 196 filter_config: The filter to use when capturing packets. 197 duration: Capture traffic for this amount of time after sending `packet`. 198 199 Returns: 200 A list of received packets. 201 """ 202 packet = self._adjust_addresses(packet) 203 return self.tg_node.send_packet_and_capture( 204 packet, 205 self._tg_port_egress, 206 self._tg_port_ingress, 207 filter_config, 208 duration, 209 ) 210 211 def get_expected_packet(self, packet: Packet) -> Packet: 212 """Inject the proper L2/L3 addresses into `packet`. 213 214 Args: 215 packet: The packet to modify. 216 217 Returns: 218 `packet` with injected L2/L3 addresses. 219 """ 220 return self._adjust_addresses(packet, expected=True) 221 222 def _adjust_addresses(self, packet: Packet, expected: bool = False) -> Packet: 223 """L2 and L3 address additions in both directions. 224 225 Assumptions: 226 Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG. 227 228 Args: 229 packet: The packet to modify. 230 expected: If :data:`True`, the direction is SUT -> TG, 231 otherwise the direction is TG -> SUT. 232 """ 233 if expected: 234 # The packet enters the TG from SUT 235 # update l2 addresses 236 packet.src = self._sut_port_egress.mac_address 237 packet.dst = self._tg_port_ingress.mac_address 238 239 # The packet is routed from TG egress to TG ingress 240 # update l3 addresses 241 packet.payload.src = self._tg_ip_address_egress.ip.exploded 242 packet.payload.dst = self._tg_ip_address_ingress.ip.exploded 243 else: 244 # The packet leaves TG towards SUT 245 # update l2 addresses 246 packet.src = self._tg_port_egress.mac_address 247 packet.dst = self._sut_port_ingress.mac_address 248 249 # The packet is routed from TG egress to TG ingress 250 # update l3 addresses 251 packet.payload.src = self._tg_ip_address_egress.ip.exploded 252 packet.payload.dst = self._tg_ip_address_ingress.ip.exploded 253 254 return Ether(packet.build()) 255 256 def verify(self, condition: bool, failure_description: str) -> None: 257 """Verify `condition` and handle failures. 258 259 When `condition` is :data:`False`, raise an exception and log the last 10 commands 260 executed on both the SUT and TG. 261 262 Args: 263 condition: The condition to check. 264 failure_description: A short description of the failure 265 that will be stored in the raised exception. 266 267 Raises: 268 TestCaseVerifyError: `condition` is :data:`False`. 269 """ 270 if not condition: 271 self._fail_test_case_verify(failure_description) 272 273 def _fail_test_case_verify(self, failure_description: str) -> None: 274 self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:") 275 for command_res in self.sut_node.main_session.remote_session.history[-10:]: 276 self._logger.debug(command_res.command) 277 self._logger.debug("A test case failed, showing the last 10 commands executed on TG:") 278 for command_res in self.tg_node.main_session.remote_session.history[-10:]: 279 self._logger.debug(command_res.command) 280 raise TestCaseVerifyError(failure_description) 281 282 def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None: 283 """Verify that `expected_packet` has been received. 284 285 Go through `received_packets` and check that `expected_packet` is among them. 286 If not, raise an exception and log the last 10 commands 287 executed on both the SUT and TG. 288 289 Args: 290 expected_packet: The packet we're expecting to receive. 291 received_packets: The packets where we're looking for `expected_packet`. 292 293 Raises: 294 TestCaseVerifyError: `expected_packet` is not among `received_packets`. 295 """ 296 for received_packet in received_packets: 297 if self._compare_packets(expected_packet, received_packet): 298 break 299 else: 300 self._logger.debug( 301 f"The expected packet {get_packet_summaries(expected_packet)} " 302 f"not found among received {get_packet_summaries(received_packets)}" 303 ) 304 self._fail_test_case_verify("An expected packet not found among received packets.") 305 306 def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool: 307 self._logger.debug( 308 f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}" 309 ) 310 311 l3 = IP in expected_packet.layers() 312 self._logger.debug("Found l3 layer") 313 314 received_payload = received_packet 315 expected_payload = expected_packet 316 while received_payload and expected_payload: 317 self._logger.debug("Comparing payloads:") 318 self._logger.debug(f"Received: {received_payload}") 319 self._logger.debug(f"Expected: {expected_payload}") 320 if received_payload.__class__ == expected_payload.__class__: 321 self._logger.debug("The layers are the same.") 322 if received_payload.__class__ == Ether: 323 if not self._verify_l2_frame(received_payload, l3): 324 return False 325 elif received_payload.__class__ == IP: 326 if not self._verify_l3_packet(received_payload, expected_payload): 327 return False 328 else: 329 # Different layers => different packets 330 return False 331 received_payload = received_payload.payload 332 expected_payload = expected_payload.payload 333 334 if expected_payload: 335 self._logger.debug(f"The expected packet did not contain {expected_payload}.") 336 return False 337 if received_payload and received_payload.__class__ != Padding: 338 self._logger.debug("The received payload had extra layers which were not padding.") 339 return False 340 return True 341 342 def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool: 343 self._logger.debug("Looking at the Ether layer.") 344 self._logger.debug( 345 f"Comparing received dst mac '{received_packet.dst}' " 346 f"with expected '{self._tg_port_ingress.mac_address}'." 347 ) 348 if received_packet.dst != self._tg_port_ingress.mac_address: 349 return False 350 351 expected_src_mac = self._tg_port_egress.mac_address 352 if l3: 353 expected_src_mac = self._sut_port_egress.mac_address 354 self._logger.debug( 355 f"Comparing received src mac '{received_packet.src}' " 356 f"with expected '{expected_src_mac}'." 357 ) 358 if received_packet.src != expected_src_mac: 359 return False 360 361 return True 362 363 def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool: 364 self._logger.debug("Looking at the IP layer.") 365 if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst: 366 return False 367 return True 368