xref: /dpdk/dts/framework/test_suite.py (revision e9fd1ebf981f361844aea9ec94e17f4bda5e1479)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2010-2014 Intel Corporation
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4
5"""Features common to all test suites.
6
7The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such
8must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics
9needed by subclasses:
10
11    * Test suite and test case execution flow,
12    * Testbed (SUT, TG) configuration,
13    * Packet sending and verification,
14    * Test case verification.
15
16The module also defines a function, :func:`get_test_suites`,
17for gathering test suites from a Python module.
18"""
19
20import importlib
21import inspect
22import re
23from ipaddress import IPv4Interface, IPv6Interface, ip_interface
24from types import MethodType
25from typing import Any, ClassVar, Union
26
27from scapy.layers.inet import IP  # type: ignore[import]
28from scapy.layers.l2 import Ether  # type: ignore[import]
29from scapy.packet import Packet, Padding  # type: ignore[import]
30
31from .exception import (
32    BlockingTestSuiteError,
33    ConfigurationError,
34    SSHTimeoutError,
35    TestCaseVerifyError,
36)
37from .logger import DTSLOG, getLogger
38from .settings import SETTINGS
39from .test_result import BuildTargetResult, Result, TestCaseResult, TestSuiteResult
40from .testbed_model import Port, PortLink, SutNode, TGNode
41from .utils import get_packet_summaries
42
43
44class TestSuite(object):
45    """The base class with methods for handling the basic flow of a test suite.
46
47        * Test case filtering and collection,
48        * Test suite setup/cleanup,
49        * Test setup/cleanup,
50        * Test case execution,
51        * Error handling and results storage.
52
53    Test cases are implemented by subclasses. Test cases are all methods starting with ``test_``,
54    further divided into performance test cases (starting with ``test_perf_``)
55    and functional test cases (all other test cases).
56
57    By default, all test cases will be executed. A list of testcase names may be specified
58    in the YAML test run configuration file and in the :option:`--test-cases` command line argument
59    or in the :envvar:`DTS_TESTCASES` environment variable to filter which test cases to run.
60    The union of both lists will be used. Any unknown test cases from the latter lists
61    will be silently ignored.
62
63    If the :option:`--re-run` command line argument or the :envvar:`DTS_RERUN` environment variable
64    is set, in case of a test case failure, the test case will be executed again until it passes
65    or it fails that many times in addition of the first failure.
66
67    The methods named ``[set_up|tear_down]_[suite|test_case]`` should be overridden in subclasses
68    if the appropriate test suite/test case fixtures are needed.
69
70    The test suite is aware of the testbed (the SUT and TG) it's running on. From this, it can
71    properly choose the IP addresses and other configuration that must be tailored to the testbed.
72
73    Attributes:
74        sut_node: The SUT node where the test suite is running.
75        tg_node: The TG node where the test suite is running.
76    """
77
78    sut_node: SutNode
79    tg_node: TGNode
80    #: Whether the test suite is blocking. A failure of a blocking test suite
81    #: will block the execution of all subsequent test suites in the current build target.
82    is_blocking: ClassVar[bool] = False
83    _logger: DTSLOG
84    _test_cases_to_run: list[str]
85    _func: bool
86    _result: TestSuiteResult
87    _port_links: list[PortLink]
88    _sut_port_ingress: Port
89    _sut_port_egress: Port
90    _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
91    _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface]
92    _tg_port_ingress: Port
93    _tg_port_egress: Port
94    _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
95    _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface]
96
97    def __init__(
98        self,
99        sut_node: SutNode,
100        tg_node: TGNode,
101        test_cases: list[str],
102        func: bool,
103        build_target_result: BuildTargetResult,
104    ):
105        """Initialize the test suite testbed information and basic configuration.
106
107        Process what test cases to run, create the associated
108        :class:`~.test_result.TestSuiteResult`, find links between ports
109        and set up default IP addresses to be used when configuring them.
110
111        Args:
112            sut_node: The SUT node where the test suite will run.
113            tg_node: The TG node where the test suite will run.
114            test_cases: The list of test cases to execute.
115                If empty, all test cases will be executed.
116            func: Whether to run functional tests.
117            build_target_result: The build target result this test suite is run in.
118        """
119        self.sut_node = sut_node
120        self.tg_node = tg_node
121        self._logger = getLogger(self.__class__.__name__)
122        self._test_cases_to_run = test_cases
123        self._test_cases_to_run.extend(SETTINGS.test_cases)
124        self._func = func
125        self._result = build_target_result.add_test_suite(self.__class__.__name__)
126        self._port_links = []
127        self._process_links()
128        self._sut_port_ingress, self._tg_port_egress = (
129            self._port_links[0].sut_port,
130            self._port_links[0].tg_port,
131        )
132        self._sut_port_egress, self._tg_port_ingress = (
133            self._port_links[1].sut_port,
134            self._port_links[1].tg_port,
135        )
136        self._sut_ip_address_ingress = ip_interface("192.168.100.2/24")
137        self._sut_ip_address_egress = ip_interface("192.168.101.2/24")
138        self._tg_ip_address_egress = ip_interface("192.168.100.3/24")
139        self._tg_ip_address_ingress = ip_interface("192.168.101.3/24")
140
141    def _process_links(self) -> None:
142        """Construct links between SUT and TG ports."""
143        for sut_port in self.sut_node.ports:
144            for tg_port in self.tg_node.ports:
145                if (sut_port.identifier, sut_port.peer) == (
146                    tg_port.peer,
147                    tg_port.identifier,
148                ):
149                    self._port_links.append(PortLink(sut_port=sut_port, tg_port=tg_port))
150
151    def set_up_suite(self) -> None:
152        """Set up test fixtures common to all test cases.
153
154        This is done before any test case has been run.
155        """
156
157    def tear_down_suite(self) -> None:
158        """Tear down the previously created test fixtures common to all test cases.
159
160        This is done after all test have been run.
161        """
162
163    def set_up_test_case(self) -> None:
164        """Set up test fixtures before each test case.
165
166        This is done before *each* test case.
167        """
168
169    def tear_down_test_case(self) -> None:
170        """Tear down the previously created test fixtures after each test case.
171
172        This is done after *each* test case.
173        """
174
175    def configure_testbed_ipv4(self, restore: bool = False) -> None:
176        """Configure IPv4 addresses on all testbed ports.
177
178        The configured ports are:
179
180        * SUT ingress port,
181        * SUT egress port,
182        * TG ingress port,
183        * TG egress port.
184
185        Args:
186            restore: If :data:`True`, will remove the configuration instead.
187        """
188        delete = True if restore else False
189        enable = False if restore else True
190        self._configure_ipv4_forwarding(enable)
191        self.sut_node.configure_port_ip_address(
192            self._sut_ip_address_egress, self._sut_port_egress, delete
193        )
194        self.sut_node.configure_port_state(self._sut_port_egress, enable)
195        self.sut_node.configure_port_ip_address(
196            self._sut_ip_address_ingress, self._sut_port_ingress, delete
197        )
198        self.sut_node.configure_port_state(self._sut_port_ingress, enable)
199        self.tg_node.configure_port_ip_address(
200            self._tg_ip_address_ingress, self._tg_port_ingress, delete
201        )
202        self.tg_node.configure_port_state(self._tg_port_ingress, enable)
203        self.tg_node.configure_port_ip_address(
204            self._tg_ip_address_egress, self._tg_port_egress, delete
205        )
206        self.tg_node.configure_port_state(self._tg_port_egress, enable)
207
208    def _configure_ipv4_forwarding(self, enable: bool) -> None:
209        self.sut_node.configure_ipv4_forwarding(enable)
210
211    def send_packet_and_capture(self, packet: Packet, duration: float = 1) -> list[Packet]:
212        """Send and receive `packet` using the associated TG.
213
214        Send `packet` through the appropriate interface and receive on the appropriate interface.
215        Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
216
217        Args:
218            packet: The packet to send.
219            duration: Capture traffic for this amount of time after sending `packet`.
220
221        Returns:
222            A list of received packets.
223        """
224        packet = self._adjust_addresses(packet)
225        return self.tg_node.send_packet_and_capture(
226            packet, self._tg_port_egress, self._tg_port_ingress, duration
227        )
228
229    def get_expected_packet(self, packet: Packet) -> Packet:
230        """Inject the proper L2/L3 addresses into `packet`.
231
232        Args:
233            packet: The packet to modify.
234
235        Returns:
236            `packet` with injected L2/L3 addresses.
237        """
238        return self._adjust_addresses(packet, expected=True)
239
240    def _adjust_addresses(self, packet: Packet, expected: bool = False) -> Packet:
241        """L2 and L3 address additions in both directions.
242
243        Assumptions:
244            Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
245
246        Args:
247            packet: The packet to modify.
248            expected: If :data:`True`, the direction is SUT -> TG,
249                otherwise the direction is TG -> SUT.
250        """
251        if expected:
252            # The packet enters the TG from SUT
253            # update l2 addresses
254            packet.src = self._sut_port_egress.mac_address
255            packet.dst = self._tg_port_ingress.mac_address
256
257            # The packet is routed from TG egress to TG ingress
258            # update l3 addresses
259            packet.payload.src = self._tg_ip_address_egress.ip.exploded
260            packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
261        else:
262            # The packet leaves TG towards SUT
263            # update l2 addresses
264            packet.src = self._tg_port_egress.mac_address
265            packet.dst = self._sut_port_ingress.mac_address
266
267            # The packet is routed from TG egress to TG ingress
268            # update l3 addresses
269            packet.payload.src = self._tg_ip_address_egress.ip.exploded
270            packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
271
272        return Ether(packet.build())
273
274    def verify(self, condition: bool, failure_description: str) -> None:
275        """Verify `condition` and handle failures.
276
277        When `condition` is :data:`False`, raise an exception and log the last 10 commands
278        executed on both the SUT and TG.
279
280        Args:
281            condition: The condition to check.
282            failure_description: A short description of the failure
283                that will be stored in the raised exception.
284
285        Raises:
286            TestCaseVerifyError: `condition` is :data:`False`.
287        """
288        if not condition:
289            self._fail_test_case_verify(failure_description)
290
291    def _fail_test_case_verify(self, failure_description: str) -> None:
292        self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:")
293        for command_res in self.sut_node.main_session.remote_session.history[-10:]:
294            self._logger.debug(command_res.command)
295        self._logger.debug("A test case failed, showing the last 10 commands executed on TG:")
296        for command_res in self.tg_node.main_session.remote_session.history[-10:]:
297            self._logger.debug(command_res.command)
298        raise TestCaseVerifyError(failure_description)
299
300    def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None:
301        """Verify that `expected_packet` has been received.
302
303        Go through `received_packets` and check that `expected_packet` is among them.
304        If not, raise an exception and log the last 10 commands
305        executed on both the SUT and TG.
306
307        Args:
308            expected_packet: The packet we're expecting to receive.
309            received_packets: The packets where we're looking for `expected_packet`.
310
311        Raises:
312            TestCaseVerifyError: `expected_packet` is not among `received_packets`.
313        """
314        for received_packet in received_packets:
315            if self._compare_packets(expected_packet, received_packet):
316                break
317        else:
318            self._logger.debug(
319                f"The expected packet {get_packet_summaries(expected_packet)} "
320                f"not found among received {get_packet_summaries(received_packets)}"
321            )
322            self._fail_test_case_verify("An expected packet not found among received packets.")
323
324    def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
325        self._logger.debug(
326            f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
327        )
328
329        l3 = IP in expected_packet.layers()
330        self._logger.debug("Found l3 layer")
331
332        received_payload = received_packet
333        expected_payload = expected_packet
334        while received_payload and expected_payload:
335            self._logger.debug("Comparing payloads:")
336            self._logger.debug(f"Received: {received_payload}")
337            self._logger.debug(f"Expected: {expected_payload}")
338            if received_payload.__class__ == expected_payload.__class__:
339                self._logger.debug("The layers are the same.")
340                if received_payload.__class__ == Ether:
341                    if not self._verify_l2_frame(received_payload, l3):
342                        return False
343                elif received_payload.__class__ == IP:
344                    if not self._verify_l3_packet(received_payload, expected_payload):
345                        return False
346            else:
347                # Different layers => different packets
348                return False
349            received_payload = received_payload.payload
350            expected_payload = expected_payload.payload
351
352        if expected_payload:
353            self._logger.debug(f"The expected packet did not contain {expected_payload}.")
354            return False
355        if received_payload and received_payload.__class__ != Padding:
356            self._logger.debug("The received payload had extra layers which were not padding.")
357            return False
358        return True
359
360    def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
361        self._logger.debug("Looking at the Ether layer.")
362        self._logger.debug(
363            f"Comparing received dst mac '{received_packet.dst}' "
364            f"with expected '{self._tg_port_ingress.mac_address}'."
365        )
366        if received_packet.dst != self._tg_port_ingress.mac_address:
367            return False
368
369        expected_src_mac = self._tg_port_egress.mac_address
370        if l3:
371            expected_src_mac = self._sut_port_egress.mac_address
372        self._logger.debug(
373            f"Comparing received src mac '{received_packet.src}' "
374            f"with expected '{expected_src_mac}'."
375        )
376        if received_packet.src != expected_src_mac:
377            return False
378
379        return True
380
381    def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
382        self._logger.debug("Looking at the IP layer.")
383        if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
384            return False
385        return True
386
387    def run(self) -> None:
388        """Set up, execute and tear down the whole suite.
389
390        Test suite execution consists of running all test cases scheduled to be executed.
391        A test case run consists of setup, execution and teardown of said test case.
392
393        Record the setup and the teardown and handle failures.
394
395        The list of scheduled test cases is constructed when creating the :class:`TestSuite` object.
396        """
397        test_suite_name = self.__class__.__name__
398
399        try:
400            self._logger.info(f"Starting test suite setup: {test_suite_name}")
401            self.set_up_suite()
402            self._result.update_setup(Result.PASS)
403            self._logger.info(f"Test suite setup successful: {test_suite_name}")
404        except Exception as e:
405            self._logger.exception(f"Test suite setup ERROR: {test_suite_name}")
406            self._result.update_setup(Result.ERROR, e)
407
408        else:
409            self._execute_test_suite()
410
411        finally:
412            try:
413                self.tear_down_suite()
414                self.sut_node.kill_cleanup_dpdk_apps()
415                self._result.update_teardown(Result.PASS)
416            except Exception as e:
417                self._logger.exception(f"Test suite teardown ERROR: {test_suite_name}")
418                self._logger.warning(
419                    f"Test suite '{test_suite_name}' teardown failed, "
420                    f"the next test suite may be affected."
421                )
422                self._result.update_setup(Result.ERROR, e)
423            if len(self._result.get_errors()) > 0 and self.is_blocking:
424                raise BlockingTestSuiteError(test_suite_name)
425
426    def _execute_test_suite(self) -> None:
427        """Execute all test cases scheduled to be executed in this suite."""
428        if self._func:
429            for test_case_method in self._get_functional_test_cases():
430                test_case_name = test_case_method.__name__
431                test_case_result = self._result.add_test_case(test_case_name)
432                all_attempts = SETTINGS.re_run + 1
433                attempt_nr = 1
434                self._run_test_case(test_case_method, test_case_result)
435                while not test_case_result and attempt_nr < all_attempts:
436                    attempt_nr += 1
437                    self._logger.info(
438                        f"Re-running FAILED test case '{test_case_name}'. "
439                        f"Attempt number {attempt_nr} out of {all_attempts}."
440                    )
441                    self._run_test_case(test_case_method, test_case_result)
442
443    def _get_functional_test_cases(self) -> list[MethodType]:
444        """Get all functional test cases defined in this TestSuite.
445
446        Returns:
447            The list of functional test cases of this TestSuite.
448        """
449        return self._get_test_cases(r"test_(?!perf_)")
450
451    def _get_test_cases(self, test_case_regex: str) -> list[MethodType]:
452        """Return a list of test cases matching test_case_regex.
453
454        Returns:
455            The list of test cases matching test_case_regex of this TestSuite.
456        """
457        self._logger.debug(f"Searching for test cases in {self.__class__.__name__}.")
458        filtered_test_cases = []
459        for test_case_name, test_case in inspect.getmembers(self, inspect.ismethod):
460            if self._should_be_executed(test_case_name, test_case_regex):
461                filtered_test_cases.append(test_case)
462        cases_str = ", ".join((x.__name__ for x in filtered_test_cases))
463        self._logger.debug(f"Found test cases '{cases_str}' in {self.__class__.__name__}.")
464        return filtered_test_cases
465
466    def _should_be_executed(self, test_case_name: str, test_case_regex: str) -> bool:
467        """Check whether the test case should be scheduled to be executed."""
468        match = bool(re.match(test_case_regex, test_case_name))
469        if self._test_cases_to_run:
470            return match and test_case_name in self._test_cases_to_run
471
472        return match
473
474    def _run_test_case(
475        self, test_case_method: MethodType, test_case_result: TestCaseResult
476    ) -> None:
477        """Setup, execute and teardown a test case in this suite.
478
479        Record the result of the setup and the teardown and handle failures.
480        """
481        test_case_name = test_case_method.__name__
482
483        try:
484            # run set_up function for each case
485            self.set_up_test_case()
486            test_case_result.update_setup(Result.PASS)
487        except SSHTimeoutError as e:
488            self._logger.exception(f"Test case setup FAILED: {test_case_name}")
489            test_case_result.update_setup(Result.FAIL, e)
490        except Exception as e:
491            self._logger.exception(f"Test case setup ERROR: {test_case_name}")
492            test_case_result.update_setup(Result.ERROR, e)
493
494        else:
495            # run test case if setup was successful
496            self._execute_test_case(test_case_method, test_case_result)
497
498        finally:
499            try:
500                self.tear_down_test_case()
501                test_case_result.update_teardown(Result.PASS)
502            except Exception as e:
503                self._logger.exception(f"Test case teardown ERROR: {test_case_name}")
504                self._logger.warning(
505                    f"Test case '{test_case_name}' teardown failed, "
506                    f"the next test case may be affected."
507                )
508                test_case_result.update_teardown(Result.ERROR, e)
509                test_case_result.update(Result.ERROR)
510
511    def _execute_test_case(
512        self, test_case_method: MethodType, test_case_result: TestCaseResult
513    ) -> None:
514        """Execute one test case, record the result and handle failures."""
515        test_case_name = test_case_method.__name__
516        try:
517            self._logger.info(f"Starting test case execution: {test_case_name}")
518            test_case_method()
519            test_case_result.update(Result.PASS)
520            self._logger.info(f"Test case execution PASSED: {test_case_name}")
521
522        except TestCaseVerifyError as e:
523            self._logger.exception(f"Test case execution FAILED: {test_case_name}")
524            test_case_result.update(Result.FAIL, e)
525        except Exception as e:
526            self._logger.exception(f"Test case execution ERROR: {test_case_name}")
527            test_case_result.update(Result.ERROR, e)
528        except KeyboardInterrupt:
529            self._logger.error(f"Test case execution INTERRUPTED by user: {test_case_name}")
530            test_case_result.update(Result.SKIP)
531            raise KeyboardInterrupt("Stop DTS")
532
533
534def get_test_suites(testsuite_module_path: str) -> list[type[TestSuite]]:
535    r"""Find all :class:`TestSuite`\s in a Python module.
536
537    Args:
538        testsuite_module_path: The path to the Python module.
539
540    Returns:
541        The list of :class:`TestSuite`\s found within the Python module.
542
543    Raises:
544        ConfigurationError: The test suite module was not found.
545    """
546
547    def is_test_suite(object: Any) -> bool:
548        try:
549            if issubclass(object, TestSuite) and object is not TestSuite:
550                return True
551        except TypeError:
552            return False
553        return False
554
555    try:
556        testcase_module = importlib.import_module(testsuite_module_path)
557    except ModuleNotFoundError as e:
558        raise ConfigurationError(f"Test suite '{testsuite_module_path}' not found.") from e
559    return [
560        test_suite_class
561        for _, test_suite_class in inspect.getmembers(testcase_module, is_test_suite)
562    ]
563