1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2022 University of New Hampshire 3# Copyright(c) 2023 PANTHEON.tech s.r.o. 4 5"""The Scapy traffic generator. 6 7A traffic generator used for functional testing, implemented with 8`the Scapy library <https://scapy.readthedocs.io/en/latest/>`_. 9The traffic generator uses an XML-RPC server to run Scapy on the remote TG node. 10 11The traffic generator uses the :mod:`xmlrpc.server` module to run an XML-RPC server 12in an interactive remote Python SSH session. The communication with the server is facilitated 13with a local server proxy from the :mod:`xmlrpc.client` module. 14""" 15 16import inspect 17import marshal 18import time 19import types 20import xmlrpc.client 21from xmlrpc.server import SimpleXMLRPCServer 22 23import scapy.all # type: ignore[import] 24from scapy.layers.l2 import Ether # type: ignore[import] 25from scapy.packet import Packet # type: ignore[import] 26 27from framework.config import OS, ScapyTrafficGeneratorConfig 28from framework.remote_session import PythonShell 29from framework.settings import SETTINGS 30from framework.testbed_model.node import Node 31from framework.testbed_model.port import Port 32 33from .capturing_traffic_generator import ( 34 CapturingTrafficGenerator, 35 _get_default_capture_name, 36) 37 38""" 39========= BEGIN RPC FUNCTIONS ========= 40 41All of the functions in this section are intended to be exported to a python 42shell which runs a scapy RPC server. These functions are made available via that 43RPC server to the packet generator. To add a new function to the RPC server, 44first write the function in this section. Then, if you need any imports, make sure to 45add them to SCAPY_RPC_SERVER_IMPORTS as well. After that, add the function to the list 46in EXPORTED_FUNCTIONS. Note that kwargs (keyword arguments) do not work via xmlrpc, 47so you may need to construct wrapper functions around many scapy types. 48""" 49 50""" 51Add the line needed to import something in a normal python environment 52as an entry to this array. It will be imported before any functions are 53sent to the server. 54""" 55SCAPY_RPC_SERVER_IMPORTS = [ 56 "from scapy.all import *", 57 "import xmlrpc", 58 "import sys", 59 "from xmlrpc.server import SimpleXMLRPCServer", 60 "import marshal", 61 "import pickle", 62 "import types", 63 "import time", 64] 65 66 67def scapy_send_packets_and_capture( 68 xmlrpc_packets: list[xmlrpc.client.Binary], 69 send_iface: str, 70 recv_iface: str, 71 duration: float, 72) -> list[bytes]: 73 """The RPC function to send and capture packets. 74 75 This function is meant to be executed on the remote TG node via the server proxy. 76 77 Args: 78 xmlrpc_packets: The packets to send. These need to be converted to 79 :class:`~xmlrpc.client.Binary` objects before sending to the remote server. 80 send_iface: The logical name of the egress interface. 81 recv_iface: The logical name of the ingress interface. 82 duration: Capture for this amount of time, in seconds. 83 84 Returns: 85 A list of bytes. Each item in the list represents one packet, which needs 86 to be converted back upon transfer from the remote node. 87 """ 88 scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets] 89 sniffer = scapy.all.AsyncSniffer( 90 iface=recv_iface, 91 store=True, 92 started_callback=lambda *args: scapy.all.sendp(scapy_packets, iface=send_iface), 93 ) 94 sniffer.start() 95 time.sleep(duration) 96 return [scapy_packet.build() for scapy_packet in sniffer.stop(join=True)] 97 98 99def scapy_send_packets(xmlrpc_packets: list[xmlrpc.client.Binary], send_iface: str) -> None: 100 """The RPC function to send packets. 101 102 This function is meant to be executed on the remote TG node via the server proxy. 103 It only sends `xmlrpc_packets`, without capturing them. 104 105 Args: 106 xmlrpc_packets: The packets to send. These need to be converted to 107 :class:`~xmlrpc.client.Binary` objects before sending to the remote server. 108 send_iface: The logical name of the egress interface. 109 """ 110 scapy_packets = [scapy.all.Packet(packet.data) for packet in xmlrpc_packets] 111 scapy.all.sendp(scapy_packets, iface=send_iface, realtime=True, verbose=True) 112 113 114""" 115Functions to be exposed by the scapy RPC server. 116""" 117RPC_FUNCTIONS = [ 118 scapy_send_packets, 119 scapy_send_packets_and_capture, 120] 121 122""" 123========= END RPC FUNCTIONS ========= 124""" 125 126 127class QuittableXMLRPCServer(SimpleXMLRPCServer): 128 """Basic XML-RPC server. 129 130 The server may be augmented by functions serializable by the :mod:`marshal` module. 131 """ 132 133 def __init__(self, *args, **kwargs): 134 """Extend the XML-RPC server initialization. 135 136 Args: 137 args: The positional arguments that will be passed to the superclass's constructor. 138 kwargs: The keyword arguments that will be passed to the superclass's constructor. 139 The `allow_none` argument will be set to :data:`True`. 140 """ 141 kwargs["allow_none"] = True 142 super().__init__(*args, **kwargs) 143 self.register_introspection_functions() 144 self.register_function(self.quit) 145 self.register_function(self.add_rpc_function) 146 147 def quit(self) -> None: 148 """Quit the server.""" 149 self._BaseServer__shutdown_request = True 150 return None 151 152 def add_rpc_function(self, name: str, function_bytes: xmlrpc.client.Binary) -> None: 153 """Add a function to the server from the local server proxy. 154 155 Args: 156 name: The name of the function. 157 function_bytes: The code of the function. 158 """ 159 function_code = marshal.loads(function_bytes.data) 160 function = types.FunctionType(function_code, globals(), name) 161 self.register_function(function) 162 163 def serve_forever(self, poll_interval: float = 0.5) -> None: 164 """Extend the superclass method with an additional print. 165 166 Once executed in the local server proxy, the print gives us a clear string to expect 167 when starting the server. The print means this function was executed on the XML-RPC server. 168 """ 169 print("XMLRPC OK") 170 super().serve_forever(poll_interval) 171 172 173class ScapyTrafficGenerator(CapturingTrafficGenerator): 174 """Provides access to scapy functions via an RPC interface. 175 176 This class extends the base with remote execution of scapy functions. 177 178 Any packets sent to the remote server are first converted to bytes. They are received as 179 :class:`~xmlrpc.client.Binary` objects on the server side. When the server sends the packets 180 back, they are also received as :class:`~xmlrpc.client.Binary` objects on the client side, are 181 converted back to :class:`~scapy.packet.Packet` objects and only then returned from the methods. 182 183 Attributes: 184 session: The exclusive interactive remote session created by the Scapy 185 traffic generator where the XML-RPC server runs. 186 rpc_server_proxy: The object used by clients to execute functions 187 on the XML-RPC server. 188 """ 189 190 session: PythonShell 191 rpc_server_proxy: xmlrpc.client.ServerProxy 192 _config: ScapyTrafficGeneratorConfig 193 194 def __init__(self, tg_node: Node, config: ScapyTrafficGeneratorConfig): 195 """Extend the constructor with Scapy TG specifics. 196 197 The traffic generator first starts an XML-RPC on the remote `tg_node`. 198 Then it populates the server with functions which use the Scapy library 199 to send/receive traffic: 200 201 * :func:`scapy_send_packets_and_capture` 202 * :func:`scapy_send_packets` 203 204 To enable verbose logging from the xmlrpc client, use the :option:`--verbose` 205 command line argument or the :envvar:`DTS_VERBOSE` environment variable. 206 207 Args: 208 tg_node: The node where the traffic generator resides. 209 config: The traffic generator's test run configuration. 210 """ 211 super().__init__(tg_node, config) 212 213 assert ( 214 self._tg_node.config.os == OS.linux 215 ), "Linux is the only supported OS for scapy traffic generation" 216 217 self.session = self._tg_node.create_interactive_shell( 218 PythonShell, timeout=5, privileged=True 219 ) 220 221 # import libs in remote python console 222 for import_statement in SCAPY_RPC_SERVER_IMPORTS: 223 self.session.send_command(import_statement) 224 225 # start the server 226 xmlrpc_server_listen_port = 8000 227 self._start_xmlrpc_server_in_remote_python(xmlrpc_server_listen_port) 228 229 # connect to the server 230 server_url = f"http://{self._tg_node.config.hostname}:{xmlrpc_server_listen_port}" 231 self.rpc_server_proxy = xmlrpc.client.ServerProxy( 232 server_url, allow_none=True, verbose=SETTINGS.verbose 233 ) 234 235 # add functions to the server 236 for function in RPC_FUNCTIONS: 237 # A slightly hacky way to move a function to the remote server. 238 # It is constructed from the name and code on the other side. 239 # Pickle cannot handle functions, nor can any of the other serialization 240 # frameworks aside from the libraries used to generate pyc files, which 241 # are even more messy to work with. 242 function_bytes = marshal.dumps(function.__code__) 243 self.rpc_server_proxy.add_rpc_function(function.__name__, function_bytes) 244 245 def _start_xmlrpc_server_in_remote_python(self, listen_port: int) -> None: 246 # load the source of the function 247 src = inspect.getsource(QuittableXMLRPCServer) 248 # Lines with only whitespace break the repl if in the middle of a function 249 # or class, so strip all lines containing only whitespace 250 src = "\n".join([line for line in src.splitlines() if not line.isspace() and line != ""]) 251 252 # execute it in the python terminal 253 self.session.send_command(src + "\n") 254 self.session.send_command( 255 f"server = QuittableXMLRPCServer(('0.0.0.0', {listen_port}));server.serve_forever()", 256 "XMLRPC OK", 257 ) 258 259 def _send_packets(self, packets: list[Packet], port: Port) -> None: 260 packets = [packet.build() for packet in packets] 261 self.rpc_server_proxy.scapy_send_packets(packets, port.logical_name) 262 263 def _send_packets_and_capture( 264 self, 265 packets: list[Packet], 266 send_port: Port, 267 receive_port: Port, 268 duration: float, 269 capture_name: str = _get_default_capture_name(), 270 ) -> list[Packet]: 271 binary_packets = [packet.build() for packet in packets] 272 273 xmlrpc_packets: list[ 274 xmlrpc.client.Binary 275 ] = self.rpc_server_proxy.scapy_send_packets_and_capture( 276 binary_packets, 277 send_port.logical_name, 278 receive_port.logical_name, 279 duration, 280 ) # type: ignore[assignment] 281 282 scapy_packets = [Ether(packet.data) for packet in xmlrpc_packets] 283 return scapy_packets 284 285 def close(self) -> None: 286 """Close the traffic generator.""" 287 try: 288 self.rpc_server_proxy.quit() 289 except ConnectionRefusedError: 290 # Because the python instance closes, we get no RPC response. 291 # Thus, this error is expected 292 pass 293 self.session.close() 294