xref: /dpdk/dts/framework/test_suite.py (revision 7917b0d38e92e8b9ec5a870415b791420e10f11a)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2010-2014 Intel Corporation
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4
5"""Features common to all test suites.
6
7The module defines the :class:`TestSuite` class which doesn't contain any test cases, and as such
8must be extended by subclasses which add test cases. The :class:`TestSuite` contains the basics
9needed by subclasses:
10
11    * Testbed (SUT, TG) configuration,
12    * Packet sending and verification,
13    * Test case verification.
14"""
15
16from ipaddress import IPv4Interface, IPv6Interface, ip_interface
17from typing import ClassVar, Union
18
19from scapy.layers.inet import IP  # type: ignore[import-untyped]
20from scapy.layers.l2 import Ether  # type: ignore[import-untyped]
21from scapy.packet import Packet, Padding  # type: ignore[import-untyped]
22
23from framework.testbed_model.port import Port, PortLink
24from framework.testbed_model.sut_node import SutNode
25from framework.testbed_model.tg_node import TGNode
26from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
27    PacketFilteringConfig,
28)
29
30from .exception import TestCaseVerifyError
31from .logger import DTSLogger, get_dts_logger
32from .utils import get_packet_summaries
33
34
35class TestSuite:
36    """The base class with building blocks needed by most test cases.
37
38        * Test suite setup/cleanup methods to override,
39        * Test case setup/cleanup methods to override,
40        * Test case verification,
41        * Testbed configuration,
42        * Traffic sending and verification.
43
44    Test cases are implemented by subclasses. Test cases are all methods starting with ``test_``,
45    further divided into performance test cases (starting with ``test_perf_``)
46    and functional test cases (all other test cases).
47
48    By default, all test cases will be executed. A list of testcase names may be specified
49    in the YAML test run configuration file and in the :option:`--test-suite` command line argument
50    or in the :envvar:`DTS_TESTCASES` environment variable to filter which test cases to run.
51    The union of both lists will be used. Any unknown test cases from the latter lists
52    will be silently ignored.
53
54    The methods named ``[set_up|tear_down]_[suite|test_case]`` should be overridden in subclasses
55    if the appropriate test suite/test case fixtures are needed.
56
57    The test suite is aware of the testbed (the SUT and TG) it's running on. From this, it can
58    properly choose the IP addresses and other configuration that must be tailored to the testbed.
59
60    Attributes:
61        sut_node: The SUT node where the test suite is running.
62        tg_node: The TG node where the test suite is running.
63    """
64
65    sut_node: SutNode
66    tg_node: TGNode
67    #: Whether the test suite is blocking. A failure of a blocking test suite
68    #: will block the execution of all subsequent test suites in the current build target.
69    is_blocking: ClassVar[bool] = False
70    _logger: DTSLogger
71    _port_links: list[PortLink]
72    _sut_port_ingress: Port
73    _sut_port_egress: Port
74    _sut_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
75    _sut_ip_address_egress: Union[IPv4Interface, IPv6Interface]
76    _tg_port_ingress: Port
77    _tg_port_egress: Port
78    _tg_ip_address_ingress: Union[IPv4Interface, IPv6Interface]
79    _tg_ip_address_egress: Union[IPv4Interface, IPv6Interface]
80
81    def __init__(
82        self,
83        sut_node: SutNode,
84        tg_node: TGNode,
85    ):
86        """Initialize the test suite testbed information and basic configuration.
87
88        Find links between ports and set up default IP addresses to be used when
89        configuring them.
90
91        Args:
92            sut_node: The SUT node where the test suite will run.
93            tg_node: The TG node where the test suite will run.
94        """
95        self.sut_node = sut_node
96        self.tg_node = tg_node
97        self._logger = get_dts_logger(self.__class__.__name__)
98        self._port_links = []
99        self._process_links()
100        self._sut_port_ingress, self._tg_port_egress = (
101            self._port_links[0].sut_port,
102            self._port_links[0].tg_port,
103        )
104        self._sut_port_egress, self._tg_port_ingress = (
105            self._port_links[1].sut_port,
106            self._port_links[1].tg_port,
107        )
108        self._sut_ip_address_ingress = ip_interface("192.168.100.2/24")
109        self._sut_ip_address_egress = ip_interface("192.168.101.2/24")
110        self._tg_ip_address_egress = ip_interface("192.168.100.3/24")
111        self._tg_ip_address_ingress = ip_interface("192.168.101.3/24")
112
113    def _process_links(self) -> None:
114        """Construct links between SUT and TG ports."""
115        for sut_port in self.sut_node.ports:
116            for tg_port in self.tg_node.ports:
117                if (sut_port.identifier, sut_port.peer) == (
118                    tg_port.peer,
119                    tg_port.identifier,
120                ):
121                    self._port_links.append(PortLink(sut_port=sut_port, tg_port=tg_port))
122
123    def set_up_suite(self) -> None:
124        """Set up test fixtures common to all test cases.
125
126        This is done before any test case has been run.
127        """
128
129    def tear_down_suite(self) -> None:
130        """Tear down the previously created test fixtures common to all test cases.
131
132        This is done after all test have been run.
133        """
134
135    def set_up_test_case(self) -> None:
136        """Set up test fixtures before each test case.
137
138        This is done before *each* test case.
139        """
140
141    def tear_down_test_case(self) -> None:
142        """Tear down the previously created test fixtures after each test case.
143
144        This is done after *each* test case.
145        """
146
147    def configure_testbed_ipv4(self, restore: bool = False) -> None:
148        """Configure IPv4 addresses on all testbed ports.
149
150        The configured ports are:
151
152        * SUT ingress port,
153        * SUT egress port,
154        * TG ingress port,
155        * TG egress port.
156
157        Args:
158            restore: If :data:`True`, will remove the configuration instead.
159        """
160        delete = True if restore else False
161        enable = False if restore else True
162        self._configure_ipv4_forwarding(enable)
163        self.sut_node.configure_port_ip_address(
164            self._sut_ip_address_egress, self._sut_port_egress, delete
165        )
166        self.sut_node.configure_port_state(self._sut_port_egress, enable)
167        self.sut_node.configure_port_ip_address(
168            self._sut_ip_address_ingress, self._sut_port_ingress, delete
169        )
170        self.sut_node.configure_port_state(self._sut_port_ingress, enable)
171        self.tg_node.configure_port_ip_address(
172            self._tg_ip_address_ingress, self._tg_port_ingress, delete
173        )
174        self.tg_node.configure_port_state(self._tg_port_ingress, enable)
175        self.tg_node.configure_port_ip_address(
176            self._tg_ip_address_egress, self._tg_port_egress, delete
177        )
178        self.tg_node.configure_port_state(self._tg_port_egress, enable)
179
180    def _configure_ipv4_forwarding(self, enable: bool) -> None:
181        self.sut_node.configure_ipv4_forwarding(enable)
182
183    def send_packet_and_capture(
184        self,
185        packet: Packet,
186        filter_config: PacketFilteringConfig = PacketFilteringConfig(),
187        duration: float = 1,
188    ) -> list[Packet]:
189        """Send and receive `packet` using the associated TG.
190
191        Send `packet` through the appropriate interface and receive on the appropriate interface.
192        Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.
193
194        Args:
195            packet: The packet to send.
196            filter_config: The filter to use when capturing packets.
197            duration: Capture traffic for this amount of time after sending `packet`.
198
199        Returns:
200            A list of received packets.
201        """
202        packet = self._adjust_addresses(packet)
203        return self.tg_node.send_packet_and_capture(
204            packet,
205            self._tg_port_egress,
206            self._tg_port_ingress,
207            filter_config,
208            duration,
209        )
210
211    def get_expected_packet(self, packet: Packet) -> Packet:
212        """Inject the proper L2/L3 addresses into `packet`.
213
214        Args:
215            packet: The packet to modify.
216
217        Returns:
218            `packet` with injected L2/L3 addresses.
219        """
220        return self._adjust_addresses(packet, expected=True)
221
222    def _adjust_addresses(self, packet: Packet, expected: bool = False) -> Packet:
223        """L2 and L3 address additions in both directions.
224
225        Assumptions:
226            Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.
227
228        Args:
229            packet: The packet to modify.
230            expected: If :data:`True`, the direction is SUT -> TG,
231                otherwise the direction is TG -> SUT.
232        """
233        if expected:
234            # The packet enters the TG from SUT
235            # update l2 addresses
236            packet.src = self._sut_port_egress.mac_address
237            packet.dst = self._tg_port_ingress.mac_address
238
239            # The packet is routed from TG egress to TG ingress
240            # update l3 addresses
241            packet.payload.src = self._tg_ip_address_egress.ip.exploded
242            packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
243        else:
244            # The packet leaves TG towards SUT
245            # update l2 addresses
246            packet.src = self._tg_port_egress.mac_address
247            packet.dst = self._sut_port_ingress.mac_address
248
249            # The packet is routed from TG egress to TG ingress
250            # update l3 addresses
251            packet.payload.src = self._tg_ip_address_egress.ip.exploded
252            packet.payload.dst = self._tg_ip_address_ingress.ip.exploded
253
254        return Ether(packet.build())
255
256    def verify(self, condition: bool, failure_description: str) -> None:
257        """Verify `condition` and handle failures.
258
259        When `condition` is :data:`False`, raise an exception and log the last 10 commands
260        executed on both the SUT and TG.
261
262        Args:
263            condition: The condition to check.
264            failure_description: A short description of the failure
265                that will be stored in the raised exception.
266
267        Raises:
268            TestCaseVerifyError: `condition` is :data:`False`.
269        """
270        if not condition:
271            self._fail_test_case_verify(failure_description)
272
273    def _fail_test_case_verify(self, failure_description: str) -> None:
274        self._logger.debug("A test case failed, showing the last 10 commands executed on SUT:")
275        for command_res in self.sut_node.main_session.remote_session.history[-10:]:
276            self._logger.debug(command_res.command)
277        self._logger.debug("A test case failed, showing the last 10 commands executed on TG:")
278        for command_res in self.tg_node.main_session.remote_session.history[-10:]:
279            self._logger.debug(command_res.command)
280        raise TestCaseVerifyError(failure_description)
281
282    def verify_packets(self, expected_packet: Packet, received_packets: list[Packet]) -> None:
283        """Verify that `expected_packet` has been received.
284
285        Go through `received_packets` and check that `expected_packet` is among them.
286        If not, raise an exception and log the last 10 commands
287        executed on both the SUT and TG.
288
289        Args:
290            expected_packet: The packet we're expecting to receive.
291            received_packets: The packets where we're looking for `expected_packet`.
292
293        Raises:
294            TestCaseVerifyError: `expected_packet` is not among `received_packets`.
295        """
296        for received_packet in received_packets:
297            if self._compare_packets(expected_packet, received_packet):
298                break
299        else:
300            self._logger.debug(
301                f"The expected packet {get_packet_summaries(expected_packet)} "
302                f"not found among received {get_packet_summaries(received_packets)}"
303            )
304            self._fail_test_case_verify("An expected packet not found among received packets.")
305
306    def _compare_packets(self, expected_packet: Packet, received_packet: Packet) -> bool:
307        self._logger.debug(
308            f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}"
309        )
310
311        l3 = IP in expected_packet.layers()
312        self._logger.debug("Found l3 layer")
313
314        received_payload = received_packet
315        expected_payload = expected_packet
316        while received_payload and expected_payload:
317            self._logger.debug("Comparing payloads:")
318            self._logger.debug(f"Received: {received_payload}")
319            self._logger.debug(f"Expected: {expected_payload}")
320            if received_payload.__class__ == expected_payload.__class__:
321                self._logger.debug("The layers are the same.")
322                if received_payload.__class__ == Ether:
323                    if not self._verify_l2_frame(received_payload, l3):
324                        return False
325                elif received_payload.__class__ == IP:
326                    if not self._verify_l3_packet(received_payload, expected_payload):
327                        return False
328            else:
329                # Different layers => different packets
330                return False
331            received_payload = received_payload.payload
332            expected_payload = expected_payload.payload
333
334        if expected_payload:
335            self._logger.debug(f"The expected packet did not contain {expected_payload}.")
336            return False
337        if received_payload and received_payload.__class__ != Padding:
338            self._logger.debug("The received payload had extra layers which were not padding.")
339            return False
340        return True
341
342    def _verify_l2_frame(self, received_packet: Ether, l3: bool) -> bool:
343        self._logger.debug("Looking at the Ether layer.")
344        self._logger.debug(
345            f"Comparing received dst mac '{received_packet.dst}' "
346            f"with expected '{self._tg_port_ingress.mac_address}'."
347        )
348        if received_packet.dst != self._tg_port_ingress.mac_address:
349            return False
350
351        expected_src_mac = self._tg_port_egress.mac_address
352        if l3:
353            expected_src_mac = self._sut_port_egress.mac_address
354        self._logger.debug(
355            f"Comparing received src mac '{received_packet.src}' "
356            f"with expected '{expected_src_mac}'."
357        )
358        if received_packet.src != expected_src_mac:
359            return False
360
361        return True
362
363    def _verify_l3_packet(self, received_packet: IP, expected_packet: IP) -> bool:
364        self._logger.debug("Looking at the IP layer.")
365        if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
366            return False
367        return True
368