xref: /dpdk/dts/framework/testbed_model/traffic_generator/scapy.py (revision 1f01c07d3dab801c70b034c5b22da6212b3466be)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2022 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4
5"""The Scapy traffic generator.
6
7A traffic generator used for functional testing, implemented with
8`the Scapy library <https://scapy.readthedocs.io/en/latest/>`_.
9The traffic generator uses an interactive shell to run Scapy on the remote TG node.
10
11The traffic generator extends :class:`framework.remote_session.python_shell.PythonShell` to
12implement the methods for handling packets by sending commands into the interactive shell.
13"""
14
15
16import re
17import time
18from typing import ClassVar
19
20from scapy.compat import base64_bytes  # type: ignore[import-untyped]
21from scapy.layers.l2 import Ether  # type: ignore[import-untyped]
22from scapy.packet import Packet  # type: ignore[import-untyped]
23
24from framework.config import OS, ScapyTrafficGeneratorConfig
25from framework.remote_session.python_shell import PythonShell
26from framework.testbed_model.node import Node
27from framework.testbed_model.port import Port
28from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
29    PacketFilteringConfig,
30)
31from framework.utils import REGEX_FOR_BASE64_ENCODING
32
33from .capturing_traffic_generator import CapturingTrafficGenerator
34
35
36class ScapyTrafficGenerator(PythonShell, CapturingTrafficGenerator):
37    """Provides access to scapy functions on a traffic generator node.
38
39    This class extends the base with remote execution of scapy functions. All methods for
40    processing packets are implemented using an underlying
41    :class:`framework.remote_session.python_shell.PythonShell` which imports the Scapy library. This
42    class also extends :class:`.capturing_traffic_generator.CapturingTrafficGenerator` to expose
43    methods that utilize said packet processing functionality to test suites.
44
45    Because of the double inheritance, this class has both methods that wrap scapy commands
46    sent into the shell (running on the TG node) and methods that run locally to fulfill
47    traffic generation needs.
48    To help make a clear distinction between the two, the names of the methods
49    that wrap the logic of the underlying shell should be prepended with "shell".
50
51    Note that the order of inheritance is important for this class. In order to instantiate this
52    class, the abstract methods of :class:`~.capturing_traffic_generator.CapturingTrafficGenerator`
53    must be implemented. Since some of these methods are implemented in the underlying interactive
54    shell, according to Python's Method Resolution Order (MRO), the interactive shell must come
55    first.
56    """
57
58    _config: ScapyTrafficGeneratorConfig
59
60    #: Name of sniffer to ensure the same is used in all places
61    _sniffer_name: ClassVar[str] = "sniffer"
62    #: Name of variable that points to the list of packets inside the scapy shell.
63    _send_packet_list_name: ClassVar[str] = "packets"
64    #: Padding to add to the start of a line for python syntax compliance.
65    _python_indentation: ClassVar[str] = " " * 4
66
67    def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig, **kwargs):
68        """Extend the constructor with Scapy TG specifics.
69
70        Initializes both the traffic generator and the interactive shell used to handle Scapy
71        functions. The interactive shell will be started on `tg_node`. The additional keyword
72        arguments in `kwargs` are used to pass into the constructor for the interactive shell.
73
74        Args:
75            tg_node: The node where the traffic generator resides.
76            config: The traffic generator's test run configuration.
77            kwargs: Additional keyword arguments. Supported arguments correspond to the parameters
78                of :meth:`PythonShell.__init__` in this case.
79        """
80        assert (
81            tg_node.config.os == OS.linux
82        ), "Linux is the only supported OS for scapy traffic generation"
83
84        super().__init__(tg_node, config=config, **kwargs)
85        self.start_application()
86
87    def start_application(self) -> None:
88        """Extends :meth:`framework.remote_session.interactive_shell.start_application`.
89
90        Adds a command that imports everything from the scapy library immediately after starting
91        the shell for usage in later calls to the methods of this class.
92        """
93        super().start_application()
94        self.send_command("from scapy.all import *")
95
96    def _send_packets(self, packets: list[Packet], port: Port) -> None:
97        """Implementation for sending packets without capturing any received traffic.
98
99        Provides a "fire and forget" method of sending packets.
100        """
101        self._shell_set_packet_list(packets)
102        send_command = [
103            "sendp(",
104            f"{self._send_packet_list_name},",
105            f"iface='{port.logical_name}',",
106            "realtime=True,",
107            "verbose=True",
108            ")",
109        ]
110        self.send_command(f"\n{self._python_indentation}".join(send_command))
111
112    def _send_packets_and_capture(
113        self,
114        packets: list[Packet],
115        send_port: Port,
116        recv_port: Port,
117        filter_config: PacketFilteringConfig,
118        duration: float,
119    ) -> list[Packet]:
120        """Implementation for sending packets and capturing any received traffic.
121
122        This method first creates an asynchronous sniffer that holds the packets to send, then
123        starts and stops said sniffer, collecting any packets that it had received while it was
124        running.
125
126        Returns:
127            A list of packets received after sending `packets`.
128        """
129        self._shell_create_sniffer(
130            packets, send_port, recv_port, self._create_packet_filter(filter_config)
131        )
132        return self._shell_start_and_stop_sniffing(duration)
133
134    def _shell_set_packet_list(self, packets: list[Packet]) -> None:
135        """Build a list of packets to send later.
136
137        Sends the string that represents the Python command that was used to create each packet in
138        `packets` into the underlying Python session. The purpose behind doing this is to create a
139        list that is identical to `packets` inside the shell. This method should only be called by
140        methods for sending packets immediately prior to sending. The list of packets will continue
141        to exist in the scope of the shell until subsequent calls to this method, so failure to
142        rebuild the list prior to sending packets could lead to undesired "stale" packets to be
143        sent.
144
145        Args:
146            packets: The list of packets to recreate in the shell.
147        """
148        self._logger.info("Building a list of packets to send.")
149        self.send_command(
150            f"{self._send_packet_list_name} = [{', '.join(map(Packet.command, packets))}]"
151        )
152
153    def _create_packet_filter(self, filter_config: PacketFilteringConfig) -> str:
154        """Combine filter settings from `filter_config` into a BPF that scapy can use.
155
156        Scapy allows for the use of Berkeley Packet Filters (BPFs) to filter what packets are
157        collected based on various attributes of the packet.
158
159        Args:
160            filter_config: Config class that specifies which filters should be applied.
161
162        Returns:
163            A string representing the combination of BPF filters to be passed to scapy. For
164            example:
165
166            "ether[12:2] != 0x88cc && ether[12:2] != 0x0806"
167        """
168        bpf_filter = []
169        if filter_config.no_arp:
170            bpf_filter.append("ether[12:2] != 0x0806")
171        if filter_config.no_lldp:
172            bpf_filter.append("ether[12:2] != 0x88cc")
173        return " && ".join(bpf_filter)
174
175    def _shell_create_sniffer(
176        self, packets_to_send: list[Packet], send_port: Port, recv_port: Port, filter_config: str
177    ) -> None:
178        """Create an asynchronous sniffer in the shell.
179
180        A list of packets is passed to the sniffer's callback function so that they are immediately
181        sent at the time sniffing is started.
182
183        Args:
184            packets_to_send: A list of packets to send when sniffing is started.
185            send_port: The port to send the packets on when sniffing is started.
186            recv_port: The port to collect the traffic from.
187            filter_config: An optional BPF format filter to use when sniffing for packets. Omitted
188                when set to an empty string.
189        """
190        self._shell_set_packet_list(packets_to_send)
191
192        self.send_command("import time")
193        sniffer_commands = [
194            f"{self._sniffer_name} = AsyncSniffer(",
195            f"iface='{recv_port.logical_name}',",
196            "store=True,",
197            # *args is used in the arguments of the lambda since Scapy sends parameters to the
198            # callback function which we do not need for our purposes.
199            "started_callback=lambda *args: (time.sleep(1), sendp(",
200            (
201                # Additional indentation is added to this line only for readability of the logs.
202                f"{self._python_indentation}{self._send_packet_list_name},"
203                f" iface='{send_port.logical_name}')),"
204            ),
205            ")",
206        ]
207        if filter_config:
208            sniffer_commands.insert(-1, f"filter='{filter_config}'")
209
210        self.send_command(f"\n{self._python_indentation}".join(sniffer_commands))
211
212    def _shell_start_and_stop_sniffing(self, duration: float) -> list[Packet]:
213        """Start asynchronous sniffer, run for a set `duration`, then collect received packets.
214
215        This method expects that you have first created an asynchronous sniffer inside the shell
216        and will fail if you haven't. Received packets are collected by printing the base64
217        encoding of each packet in the shell and then harvesting these encodings using regex to
218        convert back into packet objects.
219
220        Args:
221            duration: The amount of time in seconds to sniff for received packets.
222
223        Returns:
224            A list of all packets that were received while the sniffer was running.
225        """
226        sniffed_packets_name = "gathered_packets"
227        self.send_command(f"{self._sniffer_name}.start()")
228        # Insert a one second delay to prevent timeout errors from occurring
229        time.sleep(duration + 1)
230        self.send_command(f"{sniffed_packets_name} = {self._sniffer_name}.stop(join=True)")
231        # An extra newline is required here due to the nature of interactive Python shells
232        packet_strs = self.send_command(
233            f"for pakt in {sniffed_packets_name}: print(bytes_base64(pakt.build()))\n"
234        )
235        # In the string of bytes "b'XXXX'", we only want the contents ("XXXX")
236        list_of_packets_base64 = re.findall(
237            f"^b'({REGEX_FOR_BASE64_ENCODING})'", packet_strs, re.MULTILINE
238        )
239        return [Ether(base64_bytes(pakt)) for pakt in list_of_packets_base64]
240