1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2010-2014 Intel Corporation 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4 5"""Features common to all test suites. 6 7The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such 8must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics 9needed by subclasses: 10 11 * Testbed (SUT, TG) configuration, 12 * Packet sending and verification, 13 * Test case verification. 14""" 15 16import inspect 17from collections import Counter 18from collections.abc import Callable, Sequence 19from enum import Enum, auto 20from ipaddress import IPv4Interface, IPv6Interface, ip_interface 21from typing import ClassVar, Protocol, TypeVar, Union, cast 22 23from scapy.layers.inet import IP # type: ignore[import-untyped] 24from scapy.layers.l2 import Ether # type: ignore[import-untyped] 25from scapy.packet import Packet, Padding, raw # type: ignore[import-untyped] 26 27from framework.testbed_model.capability import TestProtocol 28from framework.testbed_model.port import Port 29from framework.testbed_model.sut_node import SutNode 30from framework.testbed_model.tg_node import TGNode 31from framework.testbed_model.topology import Topology 32from framework.testbed_model.traffic_generator.capturing_traffic_generator import ( 33 PacketFilteringConfig, 34) 35 36from .exception import ConfigurationError, TestCaseVerifyError 37from .logger import DTSLogger, get_dts_logger 38from .utils import get_packet_summaries 39 40 41class TestSuite(TestProtocol): 42 """The base class with building blocks needed by most test cases. 43 44 * Test suite setup/cleanup methods to override, 45 * Test case setup/cleanup methods to override, 46 * Test case verification, 47 * Testbed configuration, 48 * Traffic sending and verification. 49 50 Test cases are implemented by subclasses. Test cases are all methods starting with ``test_``, 51 further divided into performance test cases (starting with ``test_perf_``) 52 and functional test cases (all other test cases). 53 54 By default, all test cases will be executed. A list of testcase names may be specified 55 in the YAML test run configuration file and in the :option:`--test-suite` command line argument 56 or in the :envvar:`DTS_TESTCASES` environment variable to filter which test cases to run. 57 The union of both lists will be used. Any unknown test cases from the latter lists 58 will be silently ignored. 59 60 The methods named ``[set_up|tear_down]_[suite|test_case]`` should be overridden in subclasses 61 if the appropriate test suite/test case fixtures are needed. 62 63 The test suite is aware of the testbed (the SUT and TG) it's running on. From this, it can 64 properly choose the IP addresses and other configuration that must be tailored to the testbed. 65 66 Attributes: 67 sut_node: The SUT node where the test suite is running. 68 tg_node: The TG node where the test suite is running. 69 """ 70 71 sut_node: SutNode 72 tg_node: TGNode 73 #: Whether the test suite is blocking. A failure of a blocking test suite 74 #: will block the execution of all subsequent test suites in the current build target. 75 is_blocking: ClassVar[bool] = False 76 _logger: DTSLogger 77 _sut_port_ingress: Port 78 _sut_port_egress: Port 79 _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface] 80 _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface] 81 _tg_port_ingress: Port 82 _tg_port_egress: Port 83 _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface] 84 _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface] 85 86 def __init__( 87 self, 88 sut_node: SutNode, 89 tg_node: TGNode, 90 topology: Topology, 91 ): 92 """Initialize the test suite testbed information and basic configuration. 93 94 Find links between ports and set up default IP addresses to be used when 95 configuring them. 96 97 Args: 98 sut_node: The SUT node where the test suite will run. 99 tg_node: The TG node where the test suite will run. 100 topology: The topology where the test suite will run. 101 """ 102 self.sut_node = sut_node 103 self.tg_node = tg_node 104 self._logger = get_dts_logger(self.__class__.__name__) 105 self._tg_port_egress = topology.tg_port_egress 106 self._sut_port_ingress = topology.sut_port_ingress 107 self._sut_port_egress = topology.sut_port_egress 108 self._tg_port_ingress = topology.tg_port_ingress 109 self._sut_ip_address_ingress = ip_interface("192.168.100.2/24") 110 self._sut_ip_address_egress = ip_interface("192.168.101.2/24") 111 self._tg_ip_address_egress = ip_interface("192.168.100.3/24") 112 self._tg_ip_address_ingress = ip_interface("192.168.101.3/24") 113 114 @classmethod 115 def get_test_cases( 116 cls, test_case_sublist: Sequence[str] | None = None 117 ) -> tuple[set[type["TestCase"]], set[type["TestCase"]]]: 118 """Filter `test_case_subset` from this class. 119 120 Test cases are regular (or bound) methods decorated with :func:`func_test` 121 or :func:`perf_test`. 122 123 Args: 124 test_case_sublist: Test case names to filter from this class. 125 If empty or :data:`None`, return all test cases. 126 127 Returns: 128 The filtered test case functions. This method returns functions as opposed to methods, 129 as methods are bound to instances and this method only has access to the class. 130 131 Raises: 132 ConfigurationError: If a test case from `test_case_subset` is not found. 133 """ 134 135 def is_test_case(function: Callable) -> bool: 136 if inspect.isfunction(function): 137 # TestCase is not used at runtime, so we can't use isinstance() with `function`. 138 # But function.test_type exists. 139 if hasattr(function, "test_type"): 140 return isinstance(function.test_type, TestCaseType) 141 return False 142 143 if test_case_sublist is None: 144 test_case_sublist = [] 145 146 # the copy is needed so that the condition "elif test_case_sublist" doesn't 147 # change mid-cycle 148 test_case_sublist_copy = list(test_case_sublist) 149 func_test_cases = set() 150 perf_test_cases = set() 151 152 for test_case_name, test_case_function in inspect.getmembers(cls, is_test_case): 153 if test_case_name in test_case_sublist_copy: 154 # if test_case_sublist_copy is non-empty, remove the found test case 155 # so that we can look at the remainder at the end 156 test_case_sublist_copy.remove(test_case_name) 157 elif test_case_sublist: 158 # the original list not being empty means we're filtering test cases 159 # since we didn't remove test_case_name in the previous branch, 160 # it doesn't match the filter and we don't want to remove it 161 continue 162 163 match test_case_function.test_type: 164 case TestCaseType.PERFORMANCE: 165 perf_test_cases.add(test_case_function) 166 case TestCaseType.FUNCTIONAL: 167 func_test_cases.add(test_case_function) 168 169 if test_case_sublist_copy: 170 raise ConfigurationError( 171 f"Test cases {test_case_sublist_copy} not found among functions of {cls.__name__}." 172 ) 173 174 return func_test_cases, perf_test_cases 175 176 def set_up_suite(self) -> None: 177 """Set up test fixtures common to all test cases. 178 179 This is done before any test case has been run. 180 """ 181 182 def tear_down_suite(self) -> None: 183 """Tear down the previously created test fixtures common to all test cases. 184 185 This is done after all test have been run. 186 """ 187 188 def set_up_test_case(self) -> None: 189 """Set up test fixtures before each test case. 190 191 This is done before *each* test case. 192 """ 193 194 def tear_down_test_case(self) -> None: 195 """Tear down the previously created test fixtures after each test case. 196 197 This is done after *each* test case. 198 """ 199 200 def configure_testbed_ipv4(self, restore: bool = False) -> None: 201 """Configure IPv4 addresses on all testbed ports. 202 203 The configured ports are: 204 205 * SUT ingress port, 206 * SUT egress port, 207 * TG ingress port, 208 * TG egress port. 209 210 Args: 211 restore: If :data:`True`, will remove the configuration instead. 212 """ 213 delete = True if restore else False 214 enable = False if restore else True 215 self._configure_ipv4_forwarding(enable) 216 self.sut_node.configure_port_ip_address( 217 self._sut_ip_address_egress, self._sut_port_egress, delete 218 ) 219 self.sut_node.configure_port_state(self._sut_port_egress, enable) 220 self.sut_node.configure_port_ip_address( 221 self._sut_ip_address_ingress, self._sut_port_ingress, delete 222 ) 223 self.sut_node.configure_port_state(self._sut_port_ingress, enable) 224 self.tg_node.configure_port_ip_address( 225 self._tg_ip_address_ingress, self._tg_port_ingress, delete 226 ) 227 self.tg_node.configure_port_state(self._tg_port_ingress, enable) 228 self.tg_node.configure_port_ip_address( 229 self._tg_ip_address_egress, self._tg_port_egress, delete 230 ) 231 self.tg_node.configure_port_state(self._tg_port_egress, enable) 232 233 def _configure_ipv4_forwarding(self, enable: bool) -> None: 234 self.sut_node.configure_ipv4_forwarding(enable) 235 236 def send_packet_and_capture( 237 self, 238 packet: Packet, 239 filter_config: PacketFilteringConfig = PacketFilteringConfig(), 240 duration: float = 1, 241 ) -> list[Packet]: 242 """Send and receive `packet` using the associated TG. 243 244 Send `packet` through the appropriate interface and receive on the appropriate interface. 245 Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic. 246 247 Args: 248 packet: The packet to send. 249 filter_config: The filter to use when capturing packets. 250 duration: Capture traffic for this amount of time after sending `packet`. 251 252 Returns: 253 A list of received packets. 254 """ 255 return self.send_packets_and_capture( 256 [packet], 257 filter_config, 258 duration, 259 ) 260 261 def send_packets_and_capture( 262 self, 263 packets: list[Packet], 264 filter_config: PacketFilteringConfig = PacketFilteringConfig(), 265 duration: float = 1, 266 ) -> list[Packet]: 267 """Send and receive `packets` using the associated TG. 268 269 Send `packets` through the appropriate interface and receive on the appropriate interface. 270 Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic. 271 272 Args: 273 packets: The packets to send. 274 filter_config: The filter to use when capturing packets. 275 duration: Capture traffic for this amount of time after sending `packet`. 276 277 Returns: 278 A list of received packets. 279 """ 280 packets = self._adjust_addresses(packets) 281 return self.tg_node.send_packets_and_capture( 282 packets, 283 self._tg_port_egress, 284 self._tg_port_ingress, 285 filter_config, 286 duration, 287 ) 288 289 def send_packets( 290 self, 291 packets: list[Packet], 292 ) -> None: 293 """Send packets using the traffic generator and do not capture received traffic. 294 295 Args: 296 packets: Packets to send. 297 """ 298 packets = self._adjust_addresses(packets) 299 self.tg_node.send_packets(packets, self._tg_port_egress) 300 301 def get_expected_packet(self, packet: Packet) -> Packet: 302 """Inject the proper L2/L3 addresses into `packet`. 303 304 Args: 305 packet: The packet to modify. 306 307 Returns: 308 `packet` with injected L2/L3 addresses. 309 """ 310 return self._adjust_addresses([packet], expected=True)[0] 311 312 def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> list[Packet]: 313 """L2 and L3 address additions in both directions. 314 315 Packets in `packets` will be directly modified in this method. The returned list of packets 316 however will be copies of the modified packets. 317 318 Only missing addresses are added to packets, existing addresses will not be overridden. If 319 any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most 320 IP layer will have its addresses adjusted. 321 322 Assumptions: 323 Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG. 324 325 Args: 326 packets: The packets to modify. 327 expected: If :data:`True`, the direction is SUT -> TG, 328 otherwise the direction is TG -> SUT. 329 330 Returns: 331 A list containing copies of all packets in `packets` after modification. 332 """ 333 ret_packets = [] 334 for packet in packets: 335 # update l2 addresses 336 # If `expected` is :data:`True`, the packet enters the TG from SUT, otherwise the 337 # packet leaves the TG towards the SUT. 338 339 # The fields parameter of a packet does not include fields of the payload, so this can 340 # only be the Ether src/dst. 341 if "src" not in packet.fields: 342 packet.src = ( 343 self._sut_port_egress.mac_address 344 if expected 345 else self._tg_port_egress.mac_address 346 ) 347 if "dst" not in packet.fields: 348 packet.dst = ( 349 self._tg_port_ingress.mac_address 350 if expected 351 else self._sut_port_ingress.mac_address 352 ) 353 354 # update l3 addresses 355 # The packet is routed from TG egress to TG ingress regardless of whether it is 356 # expected or not. 357 num_ip_layers = packet.layers().count(IP) 358 if num_ip_layers > 0: 359 # Update the last IP layer if there are multiple (the framework should be modifying 360 # the packet address instead of the tunnel address if there is one). 361 l3_to_use = packet.getlayer(IP, num_ip_layers) 362 if "src" not in l3_to_use.fields: 363 l3_to_use.src = self._tg_ip_address_egress.ip.exploded 364 365 if "dst" not in l3_to_use.fields: 366 l3_to_use.dst = self._tg_ip_address_ingress.ip.exploded 367 ret_packets.append(Ether(packet.build())) 368 369 return ret_packets 370 371 def verify(self, condition: bool, failure_description: str) -> None: 372 """Verify `condition` and handle failures. 373 374 When `condition` is :data:`False`, raise an exception and log the last 10 commands 375 executed on both the SUT and TG. 376 377 Args: 378 condition: The condition to check. 379 failure_description: A short description of the failure 380 that will be stored in the raised exception. 381 382 Raises: 383 TestCaseVerifyError: `condition` is :data:`False`. 384 """ 385 if not condition: 386 self._fail_test_case_verify(failure_description) 387 388 def _fail_test_case_verify(self, failure_description: str) -> None: 389 self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:") 390 for command_res in self.sut_node.main_session.remote_session.history[-10:]: 391 self._logger.debug(command_res.command) 392 self._logger.debug("A test case failed, showing the last 10 commands executed on TG:") 393 for command_res in self.tg_node.main_session.remote_session.history[-10:]: 394 self._logger.debug(command_res.command) 395 raise TestCaseVerifyError(failure_description) 396 397 def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None: 398 """Verify that `expected_packet` has been received. 399 400 Go through `received_packets` and check that `expected_packet` is among them. 401 If not, raise an exception and log the last 10 commands 402 executed on both the SUT and TG. 403 404 Args: 405 expected_packet: The packet we're expecting to receive. 406 received_packets: The packets where we're looking for `expected_packet`. 407 408 Raises: 409 TestCaseVerifyError: `expected_packet` is not among `received_packets`. 410 """ 411 for received_packet in received_packets: 412 if self._compare_packets(expected_packet, received_packet): 413 break 414 else: 415 self._logger.debug( 416 f"The expected packet {get_packet_summaries(expected_packet)} " 417 f"not found among received {get_packet_summaries(received_packets)}" 418 ) 419 self._fail_test_case_verify("An expected packet not found among received packets.") 420 421 def match_all_packets( 422 self, expected_packets: list[Packet], received_packets: list[Packet] 423 ) -> None: 424 """Matches all the expected packets against the received ones. 425 426 Matching is performed by counting down the occurrences in a dictionary which keys are the 427 raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise) 428 are automatically ignored. 429 430 Args: 431 expected_packets: The packets we are expecting to receive. 432 received_packets: All the packets that were received. 433 434 Raises: 435 TestCaseVerifyError: if and not all the `expected_packets` were found in 436 `received_packets`. 437 """ 438 expected_packets_counters = Counter(map(raw, expected_packets)) 439 received_packets_counters = Counter(map(raw, received_packets)) 440 # The number of expected packets is subtracted by the number of received packets, ignoring 441 # any unexpected packets and capping at zero. 442 missing_packets_counters = expected_packets_counters - received_packets_counters 443 missing_packets_count = missing_packets_counters.total() 444 self._logger.debug( 445 f"match_all_packets: expected {len(expected_packets)}, " 446 f"received {len(received_packets)}, missing {missing_packets_count}" 447 ) 448 449 if missing_packets_count != 0: 450 self._fail_test_case_verify( 451 f"Not all packets were received, expected {len(expected_packets)} " 452 f"but {missing_packets_count} were missing." 453 ) 454 455 def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool: 456 self._logger.debug( 457 f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}" 458 ) 459 460 l3 = IP in expected_packet.layers() 461 self._logger.debug("Found l3 layer") 462 463 received_payload = received_packet 464 expected_payload = expected_packet 465 while received_payload and expected_payload: 466 self._logger.debug("Comparing payloads:") 467 self._logger.debug(f"Received: {received_payload}") 468 self._logger.debug(f"Expected: {expected_payload}") 469 if received_payload.__class__ == expected_payload.__class__: 470 self._logger.debug("The layers are the same.") 471 if received_payload.__class__ == Ether: 472 if not self._verify_l2_frame(received_payload, l3): 473 return False 474 elif received_payload.__class__ == IP: 475 if not self._verify_l3_packet(received_payload, expected_payload): 476 return False 477 else: 478 # Different layers => different packets 479 return False 480 received_payload = received_payload.payload 481 expected_payload = expected_payload.payload 482 483 if expected_payload: 484 self._logger.debug(f"The expected packet did not contain {expected_payload}.") 485 return False 486 if received_payload and received_payload.__class__ != Padding: 487 self._logger.debug("The received payload had extra layers which were not padding.") 488 return False 489 return True 490 491 def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool: 492 self._logger.debug("Looking at the Ether layer.") 493 self._logger.debug( 494 f"Comparing received dst mac '{received_packet.dst}' " 495 f"with expected '{self._tg_port_ingress.mac_address}'." 496 ) 497 if received_packet.dst != self._tg_port_ingress.mac_address: 498 return False 499 500 expected_src_mac = self._tg_port_egress.mac_address 501 if l3: 502 expected_src_mac = self._sut_port_egress.mac_address 503 self._logger.debug( 504 f"Comparing received src mac '{received_packet.src}' " 505 f"with expected '{expected_src_mac}'." 506 ) 507 if received_packet.src != expected_src_mac: 508 return False 509 510 return True 511 512 def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool: 513 self._logger.debug("Looking at the IP layer.") 514 if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst: 515 return False 516 return True 517 518 519#: The generic type for a method of an instance of TestSuite 520TestSuiteMethodType = TypeVar("TestSuiteMethodType", bound=Callable[[TestSuite], None]) 521 522 523class TestCaseType(Enum): 524 """The types of test cases.""" 525 526 #: 527 FUNCTIONAL = auto() 528 #: 529 PERFORMANCE = auto() 530 531 532class TestCase(TestProtocol, Protocol[TestSuiteMethodType]): 533 """Definition of the test case type for static type checking purposes. 534 535 The type is applied to test case functions through a decorator, which casts the decorated 536 test case function to :class:`TestCase` and sets common variables. 537 """ 538 539 #: 540 test_type: ClassVar[TestCaseType] 541 #: necessary for mypy so that it can treat this class as the function it's shadowing 542 __call__: TestSuiteMethodType 543 544 @classmethod 545 def make_decorator( 546 cls, test_case_type: TestCaseType 547 ) -> Callable[[TestSuiteMethodType], type["TestCase"]]: 548 """Create a decorator for test suites. 549 550 The decorator casts the decorated function as :class:`TestCase`, 551 sets it as `test_case_type` 552 and initializes common variables defined in :class:`RequiresCapabilities`. 553 554 Args: 555 test_case_type: Either a functional or performance test case. 556 557 Returns: 558 The decorator of a functional or performance test case. 559 """ 560 561 def _decorator(func: TestSuiteMethodType) -> type[TestCase]: 562 test_case = cast(type[TestCase], func) 563 test_case.skip = cls.skip 564 test_case.skip_reason = cls.skip_reason 565 test_case.required_capabilities = set() 566 test_case.topology_type = cls.topology_type 567 test_case.topology_type.add_to_required(test_case) 568 test_case.test_type = test_case_type 569 return test_case 570 571 return _decorator 572 573 574#: The decorator for functional test cases. 575func_test: Callable = TestCase.make_decorator(TestCaseType.FUNCTIONAL) 576#: The decorator for performance test cases. 577perf_test: Callable = TestCase.make_decorator(TestCaseType.PERFORMANCE) 578