xref: /dpdk/dts/framework/test_suite.py (revision 21a66096bb44a4468353782c36fc85913520dc6c)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2010-2014 Intel Corporation
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4
5"""Features common to all test suites.
6
7The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such
8must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics
9needed by subclasses:
10
11    * Testbed (SUT, TG) configuration,
12    * Packet sending and verification,
13    * Test case verification.
14"""
15
16import inspect
17from collections import Counter
18from collections.abc import Callable, Sequence
19from enum import Enum, auto
20from ipaddress import IPv4Interface, IPv6Interface, ip_interface
21from typing import ClassVar, Protocol, TypeVar, Union, cast
22
23from scapy.layers.inet import IP  # type: ignore[import-untyped]
24from scapy.layers.l2 import Ether  # type: ignore[import-untyped]
25from scapy.packet import Packet, Padding, raw  # type: ignore[import-untyped]
26
27from framework.testbed_model.capability import TestProtocol
28from framework.testbed_model.port import Port
29from framework.testbed_model.sut_node import SutNode
30from framework.testbed_model.tg_node import TGNode
31from framework.testbed_model.topology import Topology
32from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
33    PacketFilteringConfig,
34)
35
36from .exception import ConfigurationError, TestCaseVerifyError
37from .logger import DTSLogger, get_dts_logger
38from .utils import get_packet_summaries
39
40
41class TestSuite(TestProtocol):
42    """The base class with building blocks needed by most test cases.
43
44        * Test suite setup/cleanup methods to override,
45        * Test case setup/cleanup methods to override,
46        * Test case verification,
47        * Testbed configuration,
48        * Traffic sending and verification.
49
50    Test cases are implemented by subclasses. Test cases are all methods starting with ``test_``,
51    further divided into performance test cases (starting with ``test_perf_``)
52    and functional test cases (all other test cases).
53
54    By default, all test cases will be executed. A list of testcase names may be specified
55    in the YAML test run configuration file and in the :option:`--test-suite` command line argument
56    or in the :envvar:`DTS_TESTCASES` environment variable to filter which test cases to run.
57    The union of both lists will be used. Any unknown test cases from the latter lists
58    will be silently ignored.
59
60    The methods named ``[set_up|tear_down]_[suite|test_case]`` should be overridden in subclasses
61    if the appropriate test suite/test case fixtures are needed.
62
63    The test suite is aware of the testbed (the SUT and TG) it's running on. From this, it can
64    properly choose the IP addresses and other configuration that must be tailored to the testbed.
65
66    Attributes:
67        sut_node: The SUT node where the test suite is running.
68        tg_node: The TG node where the test suite is running.
69    """
70
71    sut_node: SutNode
72    tg_node: TGNode
73    #: Whether the test suite is blocking. A failure of a blocking test suite
74    #: will block the execution of all subsequent test suites in the current build target.
75    is_blocking: ClassVar[bool] = False
76    _logger: DTSLogger
77    _sut_port_ingress: Port
78    _sut_port_egress: Port
79    _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
80    _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface]
81    _tg_port_ingress: Port
82    _tg_port_egress: Port
83    _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
84    _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface]
85
86    def __init__(
87        self,
88        sut_node: SutNode,
89        tg_node: TGNode,
90        topology: Topology,
91    ):
92        """Initialize the test suite testbed information and basic configuration.
93
94        Find links between ports and set up default IP addresses to be used when
95        configuring them.
96
97        Args:
98            sut_node: The SUT node where the test suite will run.
99            tg_node: The TG node where the test suite will run.
100            topology: The topology where the test suite will run.
101        """
102        self.sut_node = sut_node
103        self.tg_node = tg_node
104        self._logger = get_dts_logger(self.__class__.__name__)
105        self._tg_port_egress = topology.tg_port_egress
106        self._sut_port_ingress = topology.sut_port_ingress
107        self._sut_port_egress = topology.sut_port_egress
108        self._tg_port_ingress = topology.tg_port_ingress
109        self._sut_ip_address_ingress = ip_interface("192.168.100.2/24")
110        self._sut_ip_address_egress = ip_interface("192.168.101.2/24")
111        self._tg_ip_address_egress = ip_interface("192.168.100.3/24")
112        self._tg_ip_address_ingress = ip_interface("192.168.101.3/24")
113
114    @classmethod
115    def get_test_cases(
116        cls, test_case_sublist: Sequence[str] | None = None
117    ) -> tuple[set[type["TestCase"]], set[type["TestCase"]]]:
118        """Filter `test_case_subset` from this class.
119
120        Test cases are regular (or bound) methods decorated with :func:`func_test`
121        or :func:`perf_test`.
122
123        Args:
124            test_case_sublist: Test case names to filter from this class.
125                If empty or :data:`None`, return all test cases.
126
127        Returns:
128            The filtered test case functions. This method returns functions as opposed to methods,
129            as methods are bound to instances and this method only has access to the class.
130
131        Raises:
132            ConfigurationError: If a test case from `test_case_subset` is not found.
133        """
134
135        def is_test_case(function: Callable) -> bool:
136            if inspect.isfunction(function):
137                # TestCase is not used at runtime, so we can't use isinstance() with `function`.
138                # But function.test_type exists.
139                if hasattr(function, "test_type"):
140                    return isinstance(function.test_type, TestCaseType)
141            return False
142
143        if test_case_sublist is None:
144            test_case_sublist = []
145
146        # the copy is needed so that the condition "elif test_case_sublist" doesn't
147        # change mid-cycle
148        test_case_sublist_copy = list(test_case_sublist)
149        func_test_cases = set()
150        perf_test_cases = set()
151
152        for test_case_name, test_case_function in inspect.getmembers(cls, is_test_case):
153            if test_case_name in test_case_sublist_copy:
154                # if test_case_sublist_copy is non-empty, remove the found test case
155                # so that we can look at the remainder at the end
156                test_case_sublist_copy.remove(test_case_name)
157            elif test_case_sublist:
158                # the original list not being empty means we're filtering test cases
159                # since we didn't remove test_case_name in the previous branch,
160                # it doesn't match the filter and we don't want to remove it
161                continue
162
163            match test_case_function.test_type:
164                case TestCaseType.PERFORMANCE:
165                    perf_test_cases.add(test_case_function)
166                case TestCaseType.FUNCTIONAL:
167                    func_test_cases.add(test_case_function)
168
169        if test_case_sublist_copy:
170            raise ConfigurationError(
171                f"Test cases {test_case_sublist_copy} not found among functions of {cls.__name__}."
172            )
173
174        return func_test_cases, perf_test_cases
175
176    def set_up_suite(self) -> None:
177        """Set up test fixtures common to all test cases.
178
179        This is done before any test case has been run.
180        """
181
182    def tear_down_suite(self) -> None:
183        """Tear down the previously created test fixtures common to all test cases.
184
185        This is done after all test have been run.
186        """
187
188    def set_up_test_case(self) -> None:
189        """Set up test fixtures before each test case.
190
191        This is done before *each* test case.
192        """
193
194    def tear_down_test_case(self) -> None:
195        """Tear down the previously created test fixtures after each test case.
196
197        This is done after *each* test case.
198        """
199
200    def configure_testbed_ipv4(self, restore: bool = False) -> None:
201        """Configure IPv4 addresses on all testbed ports.
202
203        The configured ports are:
204
205        * SUT ingress port,
206        * SUT egress port,
207        * TG ingress port,
208        * TG egress port.
209
210        Args:
211            restore: If :data:`True`, will remove the configuration instead.
212        """
213        delete = True if restore else False
214        enable = False if restore else True
215        self._configure_ipv4_forwarding(enable)
216        self.sut_node.configure_port_ip_address(
217            self._sut_ip_address_egress, self._sut_port_egress, delete
218        )
219        self.sut_node.configure_port_state(self._sut_port_egress, enable)
220        self.sut_node.configure_port_ip_address(
221            self._sut_ip_address_ingress, self._sut_port_ingress, delete
222        )
223        self.sut_node.configure_port_state(self._sut_port_ingress, enable)
224        self.tg_node.configure_port_ip_address(
225            self._tg_ip_address_ingress, self._tg_port_ingress, delete
226        )
227        self.tg_node.configure_port_state(self._tg_port_ingress, enable)
228        self.tg_node.configure_port_ip_address(
229            self._tg_ip_address_egress, self._tg_port_egress, delete
230        )
231        self.tg_node.configure_port_state(self._tg_port_egress, enable)
232
233    def _configure_ipv4_forwarding(self, enable: bool) -> None:
234        self.sut_node.configure_ipv4_forwarding(enable)
235
236    def send_packet_and_capture(
237        self,
238        packet: Packet,
239        filter_config: PacketFilteringConfig = PacketFilteringConfig(),
240        duration: float = 1,
241    ) -> list[Packet]:
242        """Send and receive `packet` using the associated TG.
243
244        Send `packet` through the appropriate interface and receive on the appropriate interface.
245        Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
246
247        Args:
248            packet: The packet to send.
249            filter_config: The filter to use when capturing packets.
250            duration: Capture traffic for this amount of time after sending `packet`.
251
252        Returns:
253            A list of received packets.
254        """
255        return self.send_packets_and_capture(
256            [packet],
257            filter_config,
258            duration,
259        )
260
261    def send_packets_and_capture(
262        self,
263        packets: list[Packet],
264        filter_config: PacketFilteringConfig = PacketFilteringConfig(),
265        duration: float = 1,
266    ) -> list[Packet]:
267        """Send and receive `packets` using the associated TG.
268
269        Send `packets` through the appropriate interface and receive on the appropriate interface.
270        Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.
271
272        Args:
273            packets: The packets to send.
274            filter_config: The filter to use when capturing packets.
275            duration: Capture traffic for this amount of time after sending `packet`.
276
277        Returns:
278            A list of received packets.
279        """
280        packets = self._adjust_addresses(packets)
281        return self.tg_node.send_packets_and_capture(
282            packets,
283            self._tg_port_egress,
284            self._tg_port_ingress,
285            filter_config,
286            duration,
287        )
288
289    def send_packets(
290        self,
291        packets: list[Packet],
292    ) -> None:
293        """Send packets using the traffic generator and do not capture received traffic.
294
295        Args:
296            packets: Packets to send.
297        """
298        packets = self._adjust_addresses(packets)
299        self.tg_node.send_packets(packets, self._tg_port_egress)
300
301    def get_expected_packet(self, packet: Packet) -> Packet:
302        """Inject the proper L2/L3 addresses into `packet`.
303
304        Args:
305            packet: The packet to modify.
306
307        Returns:
308            `packet` with injected L2/L3 addresses.
309        """
310        return self._adjust_addresses([packet], expected=True)[0]
311
312    def _adjust_addresses(self, packets: list[Packet], expected: bool = False) -> list[Packet]:
313        """L2 and L3 address additions in both directions.
314
315        Packets in `packets` will be directly modified in this method. The returned list of packets
316        however will be copies of the modified packets.
317
318        Only missing addresses are added to packets, existing addresses will not be overridden. If
319        any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most
320        IP layer will have its addresses adjusted.
321
322        Assumptions:
323            Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
324
325        Args:
326            packets: The packets to modify.
327            expected: If :data:`True`, the direction is SUT -> TG,
328                otherwise the direction is TG -> SUT.
329
330        Returns:
331            A list containing copies of all packets in `packets` after modification.
332        """
333        ret_packets = []
334        for packet in packets:
335            # update l2 addresses
336            # If `expected` is :data:`True`, the packet enters the TG from SUT, otherwise the
337            # packet leaves the TG towards the SUT.
338
339            # The fields parameter of a packet does not include fields of the payload, so this can
340            # only be the Ether src/dst.
341            if "src" not in packet.fields:
342                packet.src = (
343                    self._sut_port_egress.mac_address
344                    if expected
345                    else self._tg_port_egress.mac_address
346                )
347            if "dst" not in packet.fields:
348                packet.dst = (
349                    self._tg_port_ingress.mac_address
350                    if expected
351                    else self._sut_port_ingress.mac_address
352                )
353
354            # update l3 addresses
355            # The packet is routed from TG egress to TG ingress regardless of whether it is
356            # expected or not.
357            num_ip_layers = packet.layers().count(IP)
358            if num_ip_layers > 0:
359                # Update the last IP layer if there are multiple (the framework should be modifying
360                # the packet address instead of the tunnel address if there is one).
361                l3_to_use = packet.getlayer(IP, num_ip_layers)
362                if "src" not in l3_to_use.fields:
363                    l3_to_use.src = self._tg_ip_address_egress.ip.exploded
364
365                if "dst" not in l3_to_use.fields:
366                    l3_to_use.dst = self._tg_ip_address_ingress.ip.exploded
367            ret_packets.append(Ether(packet.build()))
368
369        return ret_packets
370
371    def verify(self, condition: bool, failure_description: str) -> None:
372        """Verify `condition` and handle failures.
373
374        When `condition` is :data:`False`, raise an exception and log the last 10 commands
375        executed on both the SUT and TG.
376
377        Args:
378            condition: The condition to check.
379            failure_description: A short description of the failure
380                that will be stored in the raised exception.
381
382        Raises:
383            TestCaseVerifyError: `condition` is :data:`False`.
384        """
385        if not condition:
386            self._fail_test_case_verify(failure_description)
387
388    def _fail_test_case_verify(self, failure_description: str) -> None:
389        self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:")
390        for command_res in self.sut_node.main_session.remote_session.history[-10:]:
391            self._logger.debug(command_res.command)
392        self._logger.debug("A test case failed, showing the last 10 commands executed on TG:")
393        for command_res in self.tg_node.main_session.remote_session.history[-10:]:
394            self._logger.debug(command_res.command)
395        raise TestCaseVerifyError(failure_description)
396
397    def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None:
398        """Verify that `expected_packet` has been received.
399
400        Go through `received_packets` and check that `expected_packet` is among them.
401        If not, raise an exception and log the last 10 commands
402        executed on both the SUT and TG.
403
404        Args:
405            expected_packet: The packet we're expecting to receive.
406            received_packets: The packets where we're looking for `expected_packet`.
407
408        Raises:
409            TestCaseVerifyError: `expected_packet` is not among `received_packets`.
410        """
411        for received_packet in received_packets:
412            if self._compare_packets(expected_packet, received_packet):
413                break
414        else:
415            self._logger.debug(
416                f"The expected packet {get_packet_summaries(expected_packet)} "
417                f"not found among received {get_packet_summaries(received_packets)}"
418            )
419            self._fail_test_case_verify("An expected packet not found among received packets.")
420
421    def match_all_packets(
422        self, expected_packets: list[Packet], received_packets: list[Packet]
423    ) -> None:
424        """Matches all the expected packets against the received ones.
425
426        Matching is performed by counting down the occurrences in a dictionary which keys are the
427        raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
428        are automatically ignored.
429
430        Args:
431            expected_packets: The packets we are expecting to receive.
432            received_packets: All the packets that were received.
433
434        Raises:
435            TestCaseVerifyError: if and not all the `expected_packets` were found in
436                `received_packets`.
437        """
438        expected_packets_counters = Counter(map(raw, expected_packets))
439        received_packets_counters = Counter(map(raw, received_packets))
440        # The number of expected packets is subtracted by the number of received packets, ignoring
441        # any unexpected packets and capping at zero.
442        missing_packets_counters = expected_packets_counters - received_packets_counters
443        missing_packets_count = missing_packets_counters.total()
444        self._logger.debug(
445            f"match_all_packets: expected {len(expected_packets)}, "
446            f"received {len(received_packets)}, missing {missing_packets_count}"
447        )
448
449        if missing_packets_count != 0:
450            self._fail_test_case_verify(
451                f"Not all packets were received, expected {len(expected_packets)} "
452                f"but {missing_packets_count} were missing."
453            )
454
455    def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
456        self._logger.debug(
457            f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
458        )
459
460        l3 = IP in expected_packet.layers()
461        self._logger.debug("Found l3 layer")
462
463        received_payload = received_packet
464        expected_payload = expected_packet
465        while received_payload and expected_payload:
466            self._logger.debug("Comparing payloads:")
467            self._logger.debug(f"Received: {received_payload}")
468            self._logger.debug(f"Expected: {expected_payload}")
469            if received_payload.__class__ == expected_payload.__class__:
470                self._logger.debug("The layers are the same.")
471                if received_payload.__class__ == Ether:
472                    if not self._verify_l2_frame(received_payload, l3):
473                        return False
474                elif received_payload.__class__ == IP:
475                    if not self._verify_l3_packet(received_payload, expected_payload):
476                        return False
477            else:
478                # Different layers => different packets
479                return False
480            received_payload = received_payload.payload
481            expected_payload = expected_payload.payload
482
483        if expected_payload:
484            self._logger.debug(f"The expected packet did not contain {expected_payload}.")
485            return False
486        if received_payload and received_payload.__class__ != Padding:
487            self._logger.debug("The received payload had extra layers which were not padding.")
488            return False
489        return True
490
491    def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
492        self._logger.debug("Looking at the Ether layer.")
493        self._logger.debug(
494            f"Comparing received dst mac '{received_packet.dst}' "
495            f"with expected '{self._tg_port_ingress.mac_address}'."
496        )
497        if received_packet.dst != self._tg_port_ingress.mac_address:
498            return False
499
500        expected_src_mac = self._tg_port_egress.mac_address
501        if l3:
502            expected_src_mac = self._sut_port_egress.mac_address
503        self._logger.debug(
504            f"Comparing received src mac '{received_packet.src}' "
505            f"with expected '{expected_src_mac}'."
506        )
507        if received_packet.src != expected_src_mac:
508            return False
509
510        return True
511
512    def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
513        self._logger.debug("Looking at the IP layer.")
514        if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
515            return False
516        return True
517
518
519#: The generic type for a method of an instance of TestSuite
520TestSuiteMethodType = TypeVar("TestSuiteMethodType", bound=Callable[[TestSuite], None])
521
522
523class TestCaseType(Enum):
524    """The types of test cases."""
525
526    #:
527    FUNCTIONAL = auto()
528    #:
529    PERFORMANCE = auto()
530
531
532class TestCase(TestProtocol, Protocol[TestSuiteMethodType]):
533    """Definition of the test case type for static type checking purposes.
534
535    The type is applied to test case functions through a decorator, which casts the decorated
536    test case function to :class:`TestCase` and sets common variables.
537    """
538
539    #:
540    test_type: ClassVar[TestCaseType]
541    #: necessary for mypy so that it can treat this class as the function it's shadowing
542    __call__: TestSuiteMethodType
543
544    @classmethod
545    def make_decorator(
546        cls, test_case_type: TestCaseType
547    ) -> Callable[[TestSuiteMethodType], type["TestCase"]]:
548        """Create a decorator for test suites.
549
550        The decorator casts the decorated function as :class:`TestCase`,
551        sets it as `test_case_type`
552        and initializes common variables defined in :class:`RequiresCapabilities`.
553
554        Args:
555            test_case_type: Either a functional or performance test case.
556
557        Returns:
558            The decorator of a functional or performance test case.
559        """
560
561        def _decorator(func: TestSuiteMethodType) -> type[TestCase]:
562            test_case = cast(type[TestCase], func)
563            test_case.skip = cls.skip
564            test_case.skip_reason = cls.skip_reason
565            test_case.required_capabilities = set()
566            test_case.topology_type = cls.topology_type
567            test_case.topology_type.add_to_required(test_case)
568            test_case.test_type = test_case_type
569            return test_case
570
571        return _decorator
572
573
574#: The decorator for functional test cases.
575func_test: Callable = TestCase.make_decorator(TestCaseType.FUNCTIONAL)
576#: The decorator for performance test cases.
577perf_test: Callable = TestCase.make_decorator(TestCaseType.PERFORMANCE)
578