1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2010-2014 Intel Corporation 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4 5"""Features common to all test suites. 6 7The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such 8must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics 9needed by subclasses: 10 11 * Test suite and test case execution flow, 12 * Testbed (SUT, TG) configuration, 13 * Packet sending and verification, 14 * Test case verification. 15 16The module also defines a function, :func:`get_test_suites`, 17for gathering test suites from a Python module. 18""" 19 20import importlib 21import inspect 22import re 23from ipaddress import IPv4Interface, IPv6Interface, ip_interface 24from types import MethodType 25from typing import Any, ClassVar, Union 26 27from scapy.layers.inet import IP # type: ignore[import] 28from scapy.layers.l2 import Ether # type: ignore[import] 29from scapy.packet import Packet, Padding # type: ignore[import] 30 31from .exception import ( 32 BlockingTestSuiteError, 33 ConfigurationError, 34 SSHTimeoutError, 35 TestCaseVerifyError, 36) 37from .logger import DTSLOG, getLogger 38from .settings import SETTINGS 39from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult 40from .testbed_model import Port, PortLink, SutNode, TGNode 41from .utils import get_packet_summaries 42 43 44class TestSuite(object): 45 """The base class with methods for handling the basic flow of a test suite. 46 47 * Test case filtering and collection, 48 * Test suite setup/cleanup, 49 * Test setup/cleanup, 50 * Test case execution, 51 * Error handling and results storage. 52 53 Test cases are implemented by subclasses. Test cases are all methods starting with ``test_``, 54 further divided into performance test cases (starting with ``test_perf_``) 55 and functional test cases (all other test cases). 56 57 By default, all test cases will be executed. A list of testcase names may be specified 58 in the YAML test run configuration file and in the :option:`--test-cases` command line argument 59 or in the :envvar:`DTS_TESTCASES` environment variable to filter which test cases to run. 60 The union of both lists will be used. Any unknown test cases from the latter lists 61 will be silently ignored. 62 63 If the :option:`--re-run` command line argument or the :envvar:`DTS_RERUN` environment variable 64 is set, in case of a test case failure, the test case will be executed again until it passes 65 or it fails that many times in addition of the first failure. 66 67 The methods named ``[set_up|tear_down]_[suite|test_case]`` should be overridden in subclasses 68 if the appropriate test suite/test case fixtures are needed. 69 70 The test suite is aware of the testbed (the SUT and TG) it's running on. From this, it can 71 properly choose the IP addresses and other configuration that must be tailored to the testbed. 72 73 Attributes: 74 sut_node: The SUT node where the test suite is running. 75 tg_node: The TG node where the test suite is running. 76 """ 77 78 sut_node: SutNode 79 tg_node: TGNode 80 #: Whether the test suite is blocking. A failure of a blocking test suite 81 #: will block the execution of all subsequent test suites in the current build target. 82 is_blocking: ClassVar[bool] = False 83 _logger: DTSLOG 84 _test_cases_to_run: list[str] 85 _func: bool 86 _result: TestSuiteResult 87 _port_links: list[PortLink] 88 _sut_port_ingress: Port 89 _sut_port_egress: Port 90 _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface] 91 _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface] 92 _tg_port_ingress: Port 93 _tg_port_egress: Port 94 _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface] 95 _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface] 96 97 def __init__( 98 self, 99 sut_node: SutNode, 100 tg_node: TGNode, 101 test_cases: list[str], 102 func: bool, 103 build_target_result: BuildTargetResult, 104 ): 105 """Initialize the test suite testbed information and basic configuration. 106 107 Process what test cases to run, create the associated 108 :class:`~.test_result.TestSuiteResult`, find links between ports 109 and set up default IP addresses to be used when configuring them. 110 111 Args: 112 sut_node: The SUT node where the test suite will run. 113 tg_node: The TG node where the test suite will run. 114 test_cases: The list of test cases to execute. 115 If empty, all test cases will be executed. 116 func: Whether to run functional tests. 117 build_target_result: The build target result this test suite is run in. 118 """ 119 self.sut_node = sut_node 120 self.tg_node = tg_node 121 self._logger = getLogger(self.__class__.__name__) 122 self._test_cases_to_run = test_cases 123 self._test_cases_to_run.extend(SETTINGS.test_cases) 124 self._func = func 125 self._result = build_target_result.add_test_suite(self.__class__.__name__) 126 self._port_links = [] 127 self._process_links() 128 self._sut_port_ingress, self._tg_port_egress = ( 129 self._port_links[0].sut_port, 130 self._port_links[0].tg_port, 131 ) 132 self._sut_port_egress, self._tg_port_ingress = ( 133 self._port_links[1].sut_port, 134 self._port_links[1].tg_port, 135 ) 136 self._sut_ip_address_ingress = ip_interface("192.168.100.2/24") 137 self._sut_ip_address_egress = ip_interface("192.168.101.2/24") 138 self._tg_ip_address_egress = ip_interface("192.168.100.3/24") 139 self._tg_ip_address_ingress = ip_interface("192.168.101.3/24") 140 141 def _process_links(self) -> None: 142 """Construct links between SUT and TG ports.""" 143 for sut_port in self.sut_node.ports: 144 for tg_port in self.tg_node.ports: 145 if (sut_port.identifier, sut_port.peer) == ( 146 tg_port.peer, 147 tg_port.identifier, 148 ): 149 self._port_links.append(PortLink(sut_port=sut_port, tg_port=tg_port)) 150 151 def set_up_suite(self) -> None: 152 """Set up test fixtures common to all test cases. 153 154 This is done before any test case has been run. 155 """ 156 157 def tear_down_suite(self) -> None: 158 """Tear down the previously created test fixtures common to all test cases. 159 160 This is done after all test have been run. 161 """ 162 163 def set_up_test_case(self) -> None: 164 """Set up test fixtures before each test case. 165 166 This is done before *each* test case. 167 """ 168 169 def tear_down_test_case(self) -> None: 170 """Tear down the previously created test fixtures after each test case. 171 172 This is done after *each* test case. 173 """ 174 175 def configure_testbed_ipv4(self, restore: bool = False) -> None: 176 """Configure IPv4 addresses on all testbed ports. 177 178 The configured ports are: 179 180 * SUT ingress port, 181 * SUT egress port, 182 * TG ingress port, 183 * TG egress port. 184 185 Args: 186 restore: If :data:`True`, will remove the configuration instead. 187 """ 188 delete = True if restore else False 189 enable = False if restore else True 190 self._configure_ipv4_forwarding(enable) 191 self.sut_node.configure_port_ip_address( 192 self._sut_ip_address_egress, self._sut_port_egress, delete 193 ) 194 self.sut_node.configure_port_state(self._sut_port_egress, enable) 195 self.sut_node.configure_port_ip_address( 196 self._sut_ip_address_ingress, self._sut_port_ingress, delete 197 ) 198 self.sut_node.configure_port_state(self._sut_port_ingress, enable) 199 self.tg_node.configure_port_ip_address( 200 self._tg_ip_address_ingress, self._tg_port_ingress, delete 201 ) 202 self.tg_node.configure_port_state(self._tg_port_ingress, enable) 203 self.tg_node.configure_port_ip_address( 204 self._tg_ip_address_egress, self._tg_port_egress, delete 205 ) 206 self.tg_node.configure_port_state(self._tg_port_egress, enable) 207 208 def _configure_ipv4_forwarding(self, enable: bool) -> None: 209 self.sut_node.configure_ipv4_forwarding(enable) 210 211 def send_packet_and_capture(self, packet: Packet, duration: float = 1) -> list[Packet]: 212 """Send and receive `packet` using the associated TG. 213 214 Send `packet` through the appropriate interface and receive on the appropriate interface. 215 Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic. 216 217 Args: 218 packet: The packet to send. 219 duration: Capture traffic for this amount of time after sending `packet`. 220 221 Returns: 222 A list of received packets. 223 """ 224 packet = self._adjust_addresses(packet) 225 return self.tg_node.send_packet_and_capture( 226 packet, self._tg_port_egress, self._tg_port_ingress, duration 227 ) 228 229 def get_expected_packet(self, packet: Packet) -> Packet: 230 """Inject the proper L2/L3 addresses into `packet`. 231 232 Args: 233 packet: The packet to modify. 234 235 Returns: 236 `packet` with injected L2/L3 addresses. 237 """ 238 return self._adjust_addresses(packet, expected=True) 239 240 def _adjust_addresses(self, packet: Packet, expected: bool = False) -> Packet: 241 """L2 and L3 address additions in both directions. 242 243 Assumptions: 244 Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG. 245 246 Args: 247 packet: The packet to modify. 248 expected: If :data:`True`, the direction is SUT -> TG, 249 otherwise the direction is TG -> SUT. 250 """ 251 if expected: 252 # The packet enters the TG from SUT 253 # update l2 addresses 254 packet.src = self._sut_port_egress.mac_address 255 packet.dst = self._tg_port_ingress.mac_address 256 257 # The packet is routed from TG egress to TG ingress 258 # update l3 addresses 259 packet.payload.src = self._tg_ip_address_egress.ip.exploded 260 packet.payload.dst = self._tg_ip_address_ingress.ip.exploded 261 else: 262 # The packet leaves TG towards SUT 263 # update l2 addresses 264 packet.src = self._tg_port_egress.mac_address 265 packet.dst = self._sut_port_ingress.mac_address 266 267 # The packet is routed from TG egress to TG ingress 268 # update l3 addresses 269 packet.payload.src = self._tg_ip_address_egress.ip.exploded 270 packet.payload.dst = self._tg_ip_address_ingress.ip.exploded 271 272 return Ether(packet.build()) 273 274 def verify(self, condition: bool, failure_description: str) -> None: 275 """Verify `condition` and handle failures. 276 277 When `condition` is :data:`False`, raise an exception and log the last 10 commands 278 executed on both the SUT and TG. 279 280 Args: 281 condition: The condition to check. 282 failure_description: A short description of the failure 283 that will be stored in the raised exception. 284 285 Raises: 286 TestCaseVerifyError: `condition` is :data:`False`. 287 """ 288 if not condition: 289 self._fail_test_case_verify(failure_description) 290 291 def _fail_test_case_verify(self, failure_description: str) -> None: 292 self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:") 293 for command_res in self.sut_node.main_session.remote_session.history[-10:]: 294 self._logger.debug(command_res.command) 295 self._logger.debug("A test case failed, showing the last 10 commands executed on TG:") 296 for command_res in self.tg_node.main_session.remote_session.history[-10:]: 297 self._logger.debug(command_res.command) 298 raise TestCaseVerifyError(failure_description) 299 300 def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None: 301 """Verify that `expected_packet` has been received. 302 303 Go through `received_packets` and check that `expected_packet` is among them. 304 If not, raise an exception and log the last 10 commands 305 executed on both the SUT and TG. 306 307 Args: 308 expected_packet: The packet we're expecting to receive. 309 received_packets: The packets where we're looking for `expected_packet`. 310 311 Raises: 312 TestCaseVerifyError: `expected_packet` is not among `received_packets`. 313 """ 314 for received_packet in received_packets: 315 if self._compare_packets(expected_packet, received_packet): 316 break 317 else: 318 self._logger.debug( 319 f"The expected packet {get_packet_summaries(expected_packet)} " 320 f"not found among received {get_packet_summaries(received_packets)}" 321 ) 322 self._fail_test_case_verify("An expected packet not found among received packets.") 323 324 def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool: 325 self._logger.debug( 326 f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}" 327 ) 328 329 l3 = IP in expected_packet.layers() 330 self._logger.debug("Found l3 layer") 331 332 received_payload = received_packet 333 expected_payload = expected_packet 334 while received_payload and expected_payload: 335 self._logger.debug("Comparing payloads:") 336 self._logger.debug(f"Received: {received_payload}") 337 self._logger.debug(f"Expected: {expected_payload}") 338 if received_payload.__class__ == expected_payload.__class__: 339 self._logger.debug("The layers are the same.") 340 if received_payload.__class__ == Ether: 341 if not self._verify_l2_frame(received_payload, l3): 342 return False 343 elif received_payload.__class__ == IP: 344 if not self._verify_l3_packet(received_payload, expected_payload): 345 return False 346 else: 347 # Different layers => different packets 348 return False 349 received_payload = received_payload.payload 350 expected_payload = expected_payload.payload 351 352 if expected_payload: 353 self._logger.debug(f"The expected packet did not contain {expected_payload}.") 354 return False 355 if received_payload and received_payload.__class__ != Padding: 356 self._logger.debug("The received payload had extra layers which were not padding.") 357 return False 358 return True 359 360 def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool: 361 self._logger.debug("Looking at the Ether layer.") 362 self._logger.debug( 363 f"Comparing received dst mac '{received_packet.dst}' " 364 f"with expected '{self._tg_port_ingress.mac_address}'." 365 ) 366 if received_packet.dst != self._tg_port_ingress.mac_address: 367 return False 368 369 expected_src_mac = self._tg_port_egress.mac_address 370 if l3: 371 expected_src_mac = self._sut_port_egress.mac_address 372 self._logger.debug( 373 f"Comparing received src mac '{received_packet.src}' " 374 f"with expected '{expected_src_mac}'." 375 ) 376 if received_packet.src != expected_src_mac: 377 return False 378 379 return True 380 381 def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool: 382 self._logger.debug("Looking at the IP layer.") 383 if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst: 384 return False 385 return True 386 387 def run(self) -> None: 388 """Set up, execute and tear down the whole suite. 389 390 Test suite execution consists of running all test cases scheduled to be executed. 391 A test case run consists of setup, execution and teardown of said test case. 392 393 Record the setup and the teardown and handle failures. 394 395 The list of scheduled test cases is constructed when creating the :class:`TestSuite` object. 396 """ 397 test_suite_name = self.__class__.__name__ 398 399 try: 400 self._logger.info(f"Starting test suite setup: {test_suite_name}") 401 self.set_up_suite() 402 self._result.update_setup(Result.PASS) 403 self._logger.info(f"Test suite setup successful: {test_suite_name}") 404 except Exception as e: 405 self._logger.exception(f"Test suite setup ERROR: {test_suite_name}") 406 self._result.update_setup(Result.ERROR, e) 407 408 else: 409 self._execute_test_suite() 410 411 finally: 412 try: 413 self.tear_down_suite() 414 self.sut_node.kill_cleanup_dpdk_apps() 415 self._result.update_teardown(Result.PASS) 416 except Exception as e: 417 self._logger.exception(f"Test suite teardown ERROR: {test_suite_name}") 418 self._logger.warning( 419 f"Test suite '{test_suite_name}' teardown failed, " 420 f"the next test suite may be affected." 421 ) 422 self._result.update_setup(Result.ERROR, e) 423 if len(self._result.get_errors()) > 0 and self.is_blocking: 424 raise BlockingTestSuiteError(test_suite_name) 425 426 def _execute_test_suite(self) -> None: 427 """Execute all test cases scheduled to be executed in this suite.""" 428 if self._func: 429 for test_case_method in self._get_functional_test_cases(): 430 test_case_name = test_case_method.__name__ 431 test_case_result = self._result.add_test_case(test_case_name) 432 all_attempts = SETTINGS.re_run + 1 433 attempt_nr = 1 434 self._run_test_case(test_case_method, test_case_result) 435 while not test_case_result and attempt_nr < all_attempts: 436 attempt_nr += 1 437 self._logger.info( 438 f"Re-running FAILED test case '{test_case_name}'. " 439 f"Attempt number {attempt_nr} out of {all_attempts}." 440 ) 441 self._run_test_case(test_case_method, test_case_result) 442 443 def _get_functional_test_cases(self) -> list[MethodType]: 444 """Get all functional test cases defined in this TestSuite. 445 446 Returns: 447 The list of functional test cases of this TestSuite. 448 """ 449 return self._get_test_cases(r"test_(?!perf_)") 450 451 def _get_test_cases(self, test_case_regex: str) -> list[MethodType]: 452 """Return a list of test cases matching test_case_regex. 453 454 Returns: 455 The list of test cases matching test_case_regex of this TestSuite. 456 """ 457 self._logger.debug(f"Searching for test cases in {self.__class__.__name__}.") 458 filtered_test_cases = [] 459 for test_case_name, test_case in inspect.getmembers(self, inspect.ismethod): 460 if self._should_be_executed(test_case_name, test_case_regex): 461 filtered_test_cases.append(test_case) 462 cases_str = ", ".join((x.__name__ for x in filtered_test_cases)) 463 self._logger.debug(f"Found test cases '{cases_str}' in {self.__class__.__name__}.") 464 return filtered_test_cases 465 466 def _should_be_executed(self, test_case_name: str, test_case_regex: str) -> bool: 467 """Check whether the test case should be scheduled to be executed.""" 468 match = bool(re.match(test_case_regex, test_case_name)) 469 if self._test_cases_to_run: 470 return match and test_case_name in self._test_cases_to_run 471 472 return match 473 474 def _run_test_case( 475 self, test_case_method: MethodType, test_case_result: TestCaseResult 476 ) -> None: 477 """Setup, execute and teardown a test case in this suite. 478 479 Record the result of the setup and the teardown and handle failures. 480 """ 481 test_case_name = test_case_method.__name__ 482 483 try: 484 # run set_up function for each case 485 self.set_up_test_case() 486 test_case_result.update_setup(Result.PASS) 487 except SSHTimeoutError as e: 488 self._logger.exception(f"Test case setup FAILED: {test_case_name}") 489 test_case_result.update_setup(Result.FAIL, e) 490 except Exception as e: 491 self._logger.exception(f"Test case setup ERROR: {test_case_name}") 492 test_case_result.update_setup(Result.ERROR, e) 493 494 else: 495 # run test case if setup was successful 496 self._execute_test_case(test_case_method, test_case_result) 497 498 finally: 499 try: 500 self.tear_down_test_case() 501 test_case_result.update_teardown(Result.PASS) 502 except Exception as e: 503 self._logger.exception(f"Test case teardown ERROR: {test_case_name}") 504 self._logger.warning( 505 f"Test case '{test_case_name}' teardown failed, " 506 f"the next test case may be affected." 507 ) 508 test_case_result.update_teardown(Result.ERROR, e) 509 test_case_result.update(Result.ERROR) 510 511 def _execute_test_case( 512 self, test_case_method: MethodType, test_case_result: TestCaseResult 513 ) -> None: 514 """Execute one test case, record the result and handle failures.""" 515 test_case_name = test_case_method.__name__ 516 try: 517 self._logger.info(f"Starting test case execution: {test_case_name}") 518 test_case_method() 519 test_case_result.update(Result.PASS) 520 self._logger.info(f"Test case execution PASSED: {test_case_name}") 521 522 except TestCaseVerifyError as e: 523 self._logger.exception(f"Test case execution FAILED: {test_case_name}") 524 test_case_result.update(Result.FAIL, e) 525 except Exception as e: 526 self._logger.exception(f"Test case execution ERROR: {test_case_name}") 527 test_case_result.update(Result.ERROR, e) 528 except KeyboardInterrupt: 529 self._logger.error(f"Test case execution INTERRUPTED by user: {test_case_name}") 530 test_case_result.update(Result.SKIP) 531 raise KeyboardInterrupt("Stop DTS") 532 533 534def get_test_suites(testsuite_module_path: str) -> list[type[TestSuite]]: 535 r"""Find all :class:`TestSuite`\s in a Python module. 536 537 Args: 538 testsuite_module_path: The path to the Python module. 539 540 Returns: 541 The list of :class:`TestSuite`\s found within the Python module. 542 543 Raises: 544 ConfigurationError: The test suite module was not found. 545 """ 546 547 def is_test_suite(object: Any) -> bool: 548 try: 549 if issubclass(object, TestSuite) and object is not TestSuite: 550 return True 551 except TypeError: 552 return False 553 return False 554 555 try: 556 testcase_module = importlib.import_module(testsuite_module_path) 557 except ModuleNotFoundError as e: 558 raise ConfigurationError(f"Test suite '{testsuite_module_path}' not found.") from e 559 return [ 560 test_suite_class 561 for _, test_suite_class in inspect.getmembers(testcase_module, is_test_suite) 562 ] 563