xref: /dpdk/dts/framework/testbed_model/traffic_generator/scapy.py (revision e9fd1ebf981f361844aea9ec94e17f4bda5e1479)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2022 University of New Hampshire
3# Copyright(c) 2023 PANTHEON.tech s.r.o.
4
5"""The Scapy traffic generator.
6
7A traffic generator used for functional testing, implemented with
8`the Scapy library <https://scapy.readthedocs.io/en/latest/>`_.
9The traffic generator uses an XML-RPC server to run Scapy on the remote TG node.
10
11The traffic generator uses the :mod:`xmlrpc.server` module to run an XML-RPC server
12in an interactive remote Python SSH session. The communication with the server is facilitated
13with a local server proxy from the :mod:`xmlrpc.client` module.
14"""
15
16import inspect
17import marshal
18import time
19import types
20import xmlrpc.client
21from xmlrpc.server import SimpleXMLRPCServer
22
23import scapy.all  # type: ignore[import]
24from scapy.layers.l2 import Ether  # type: ignore[import]
25from scapy.packet import Packet  # type: ignore[import]
26
27from framework.config import OS, ScapyTrafficGeneratorConfig
28from framework.remote_session import PythonShell
29from framework.settings import SETTINGS
30from framework.testbed_model.node import Node
31from framework.testbed_model.port import Port
32
33from .capturing_traffic_generator import (
34    CapturingTrafficGenerator,
35    _get_default_capture_name,
36)
37
38"""
39========= BEGIN RPC FUNCTIONS =========
40
41All of the functions in this section are intended to be exported to a python
42shell which runs a scapy RPC server. These functions are made available via that
43RPC server to the packet generator. To add a new function to the RPC server,
44first write the function in this section. Then, if you need any imports, make sure to
45add them to SCAPY_RPC_SERVER_IMPORTS as well. After that, add the function to the list
46in EXPORTED_FUNCTIONS. Note that kwargs (keyword arguments) do not work via xmlrpc,
47so you may need to construct wrapper functions around many scapy types.
48"""
49
50"""
51Add the line needed to import something in a normal python environment
52as an entry to this array. It will be imported before any functions are
53sent to the server.
54"""
55SCAPY_RPC_SERVER_IMPORTS = [
56    "from scapy.all import *",
57    "import xmlrpc",
58    "import sys",
59    "from xmlrpc.server import SimpleXMLRPCServer",
60    "import marshal",
61    "import pickle",
62    "import types",
63    "import time",
64]
65
66
67def scapy_send_packets_and_capture(
68    xmlrpc_packets: list[xmlrpc.client.Binary],
69    send_iface: str,
70    recv_iface: str,
71    duration: float,
72) -> list[bytes]:
73    """The RPC function to send and capture packets.
74
75    This function is meant to be executed on the remote TG node via the server proxy.
76
77    Args:
78        xmlrpc_packets: The packets to send. These need to be converted to
79            :class:`~xmlrpc.client.Binary` objects before sending to the remote server.
80        send_iface: The logical name of the egress interface.
81        recv_iface: The logical name of the ingress interface.
82        duration: Capture for this amount of time, in seconds.
83
84    Returns:
85        A list of bytes. Each item in the list represents one packet, which needs
86        to be converted back upon transfer from the remote node.
87    """
88    scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets]
89    sniffer = scapy.all.AsyncSniffer(
90        iface=recv_iface,
91        store=True,
92        started_callback=lambda *args: scapy.all.sendp(scapy_packets, iface=send_iface),
93    )
94    sniffer.start()
95    time.sleep(duration)
96    return [scapy_packet.build() for scapy_packet in sniffer.stop(join=True)]
97
98
99def scapy_send_packets(xmlrpc_packets: list[xmlrpc.client.Binary], send_iface: str) -> None:
100    """The RPC function to send packets.
101
102    This function is meant to be executed on the remote TG node via the server proxy.
103    It only sends `xmlrpc_packets`, without capturing them.
104
105    Args:
106        xmlrpc_packets: The packets to send. These need to be converted to
107            :class:`~xmlrpc.client.Binary` objects before sending to the remote server.
108        send_iface: The logical name of the egress interface.
109    """
110    scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets]
111    scapy.all.sendp(scapy_packets, iface=send_iface, realtime=True, verbose=True)
112
113
114"""
115Functions to be exposed by the scapy RPC server.
116"""
117RPC_FUNCTIONS = [
118    scapy_send_packets,
119    scapy_send_packets_and_capture,
120]
121
122"""
123========= END RPC FUNCTIONS =========
124"""
125
126
127class QuittableXMLRPCServer(SimpleXMLRPCServer):
128    """Basic XML-RPC server.
129
130    The server may be augmented by functions serializable by the :mod:`marshal` module.
131    """
132
133    def __init__(self, *args, **kwargs):
134        """Extend the XML-RPC server initialization.
135
136        Args:
137            args: The positional arguments that will be passed to the superclass's constructor.
138            kwargs: The keyword arguments that will be passed to the superclass's constructor.
139                The `allow_none` argument will be set to :data:`True`.
140        """
141        kwargs["allow_none"] = True
142        super().__init__(*args, **kwargs)
143        self.register_introspection_functions()
144        self.register_function(self.quit)
145        self.register_function(self.add_rpc_function)
146
147    def quit(self) -> None:
148        """Quit the server."""
149        self._BaseServer__shutdown_request = True
150        return None
151
152    def add_rpc_function(self, name: str, function_bytes: xmlrpc.client.Binary) -> None:
153        """Add a function to the server from the local server proxy.
154
155        Args:
156              name: The name of the function.
157              function_bytes: The code of the function.
158        """
159        function_code = marshal.loads(function_bytes.data)
160        function = types.FunctionType(function_code, globals(), name)
161        self.register_function(function)
162
163    def serve_forever(self, poll_interval: float = 0.5) -> None:
164        """Extend the superclass method with an additional print.
165
166        Once executed in the local server proxy, the print gives us a clear string to expect
167        when starting the server. The print means this function was executed on the XML-RPC server.
168        """
169        print("XMLRPC OK")
170        super().serve_forever(poll_interval)
171
172
173class ScapyTrafficGenerator(CapturingTrafficGenerator):
174    """Provides access to scapy functions via an RPC interface.
175
176    This class extends the base with remote execution of scapy functions.
177
178    Any packets sent to the remote server are first converted to bytes. They are received as
179    :class:`~xmlrpc.client.Binary` objects on the server side. When the server sends the packets
180    back, they are also received as :class:`~xmlrpc.client.Binary` objects on the client side, are
181    converted back to :class:`~scapy.packet.Packet` objects and only then returned from the methods.
182
183    Attributes:
184        session: The exclusive interactive remote session created by the Scapy
185            traffic generator where the XML-RPC server runs.
186        rpc_server_proxy: The object used by clients to execute functions
187            on the XML-RPC server.
188    """
189
190    session: PythonShell
191    rpc_server_proxy: xmlrpc.client.ServerProxy
192    _config: ScapyTrafficGeneratorConfig
193
194    def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig):
195        """Extend the constructor with Scapy TG specifics.
196
197        The traffic generator first starts an XML-RPC on the remote `tg_node`.
198        Then it populates the server with functions which use the Scapy library
199        to send/receive traffic:
200
201            * :func:`scapy_send_packets_and_capture`
202            * :func:`scapy_send_packets`
203
204        To enable verbose logging from the xmlrpc client, use the :option:`--verbose`
205        command line argument or the :envvar:`DTS_VERBOSE` environment variable.
206
207        Args:
208            tg_node: The node where the traffic generator resides.
209            config: The traffic generator's test run configuration.
210        """
211        super().__init__(tg_node, config)
212
213        assert (
214            self._tg_node.config.os == OS.linux
215        ), "Linux is the only supported OS for scapy traffic generation"
216
217        self.session = self._tg_node.create_interactive_shell(
218            PythonShell, timeout=5, privileged=True
219        )
220
221        # import libs in remote python console
222        for import_statement in SCAPY_RPC_SERVER_IMPORTS:
223            self.session.send_command(import_statement)
224
225        # start the server
226        xmlrpc_server_listen_port = 8000
227        self._start_xmlrpc_server_in_remote_python(xmlrpc_server_listen_port)
228
229        # connect to the server
230        server_url = f"http://{self._tg_node.config.hostname}:{xmlrpc_server_listen_port}"
231        self.rpc_server_proxy = xmlrpc.client.ServerProxy(
232            server_url, allow_none=True, verbose=SETTINGS.verbose
233        )
234
235        # add functions to the server
236        for function in RPC_FUNCTIONS:
237            # A slightly hacky way to move a function to the remote server.
238            # It is constructed from the name and code on the other side.
239            # Pickle cannot handle functions, nor can any of the other serialization
240            # frameworks aside from the libraries used to generate pyc files, which
241            # are even more messy to work with.
242            function_bytes = marshal.dumps(function.__code__)
243            self.rpc_server_proxy.add_rpc_function(function.__name__, function_bytes)
244
245    def _start_xmlrpc_server_in_remote_python(self, listen_port: int) -> None:
246        # load the source of the function
247        src = inspect.getsource(QuittableXMLRPCServer)
248        # Lines with only whitespace break the repl if in the middle of a function
249        # or class, so strip all lines containing only whitespace
250        src = "\n".join([line for line in src.splitlines() if not line.isspace() and line != ""])
251
252        # execute it in the python terminal
253        self.session.send_command(src + "\n")
254        self.session.send_command(
255            f"server = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));server.serve_forever()",
256            "XMLRPC OK",
257        )
258
259    def _send_packets(self, packets: list[Packet], port: Port) -> None:
260        packets = [packet.build() for packet in packets]
261        self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name)
262
263    def _send_packets_and_capture(
264        self,
265        packets: list[Packet],
266        send_port: Port,
267        receive_port: Port,
268        duration: float,
269        capture_name: str = _get_default_capture_name(),
270    ) -> list[Packet]:
271        binary_packets = [packet.build() for packet in packets]
272
273        xmlrpc_packets: list[
274            xmlrpc.client.Binary
275        ] = self.rpc_server_proxy.scapy_send_packets_and_capture(
276            binary_packets,
277            send_port.logical_name,
278            receive_port.logical_name,
279            duration,
280        )  # type: ignore[assignment]
281
282        scapy_packets = [Ether(packet.data) for packet in xmlrpc_packets]
283        return scapy_packets
284
285    def close(self) -> None:
286        """Close the traffic generator."""
287        try:
288            self.rpc_server_proxy.quit()
289        except ConnectionRefusedError:
290            # Because the python instance closes, we get no RPC response.
291            # Thus, this error is expected
292            pass
293        self.session.close()
294