1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2010-2014 Intel Corporation 3# Copyright(c) 2022-2023 PANTHEON.tech s.r.o. 4# Copyright(c) 2022-2023 University of New Hampshire 5# Copyright(c) 2024 Arm Limited 6 7"""Various utility classes and functions. 8 9These are used in multiple modules across the framework. They're here because 10they provide some non-specific functionality, greatly simplify imports or just don't 11fit elsewhere. 12 13Attributes: 14 REGEX_FOR_PCI_ADDRESS: The regex representing a PCI address, e.g. ``0000:00:08.0``. 15""" 16 17import atexit 18import json 19import os 20import random 21import subprocess 22from enum import Enum, Flag 23from pathlib import Path 24from subprocess import SubprocessError 25 26from scapy.layers.inet import IP, TCP, UDP, Ether # type: ignore[import-untyped] 27from scapy.packet import Packet # type: ignore[import-untyped] 28 29from .exception import ConfigurationError, InternalError 30 31REGEX_FOR_PCI_ADDRESS: str = "/[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}/" 32 33 34def expand_range(range_str: str) -> list[int]: 35 """Process `range_str` into a list of integers. 36 37 There are two possible formats of `range_str`: 38 39 * ``n`` - a single integer, 40 * ``n-m`` - a range of integers. 41 42 The returned range includes both ``n`` and ``m``. Empty string returns an empty list. 43 44 Args: 45 range_str: The range to expand. 46 47 Returns: 48 All the numbers from the range. 49 """ 50 expanded_range: list[int] = [] 51 if range_str: 52 range_boundaries = range_str.split("-") 53 # will throw an exception when items in range_boundaries can't be converted, 54 # serving as type check 55 expanded_range.extend(range(int(range_boundaries[0]), int(range_boundaries[-1]) + 1)) 56 57 return expanded_range 58 59 60def get_packet_summaries(packets: list[Packet]) -> str: 61 """Format a string summary from `packets`. 62 63 Args: 64 packets: The packets to format. 65 66 Returns: 67 The summary of `packets`. 68 """ 69 if len(packets) == 1: 70 packet_summaries = packets[0].summary() 71 else: 72 packet_summaries = json.dumps(list(map(lambda pkt: pkt.summary(), packets)), indent=4) 73 return f"Packet contents: \n{packet_summaries}" 74 75 76def get_commit_id(rev_id: str) -> str: 77 """Given a Git revision ID, return the corresponding commit ID. 78 79 Args: 80 rev_id: The Git revision ID. 81 82 Raises: 83 ConfigurationError: The ``git rev-parse`` command failed, suggesting 84 an invalid or ambiguous revision ID was supplied. 85 """ 86 result = subprocess.run( 87 ["git", "rev-parse", "--verify", rev_id], 88 text=True, 89 capture_output=True, 90 ) 91 if result.returncode != 0: 92 raise ConfigurationError( 93 f"{rev_id} is not a valid git reference.\n" 94 f"Command: {result.args}\n" 95 f"Stdout: {result.stdout}\n" 96 f"Stderr: {result.stderr}" 97 ) 98 return result.stdout.strip() 99 100 101class StrEnum(Enum): 102 """Enum with members stored as strings.""" 103 104 @staticmethod 105 def _generate_next_value_(name: str, start: int, count: int, last_values: object) -> str: 106 return name 107 108 def __str__(self) -> str: 109 """The string representation is the name of the member.""" 110 return self.name 111 112 113class MesonArgs: 114 """Aggregate the arguments needed to build DPDK.""" 115 116 _default_library: str 117 118 def __init__(self, default_library: str | None = None, **dpdk_args: str | bool): 119 """Initialize the meson arguments. 120 121 Args: 122 default_library: The default library type, Meson supports ``shared``, ``static`` and 123 ``both``. Defaults to :data:`None`, in which case the argument won't be used. 124 dpdk_args: The arguments found in ``meson_options.txt`` in root DPDK directory. 125 Do not use ``-D`` with them. 126 127 Example: 128 :: 129 130 meson_args = MesonArgs(enable_kmods=True). 131 """ 132 self._default_library = f"--default-library={default_library}" if default_library else "" 133 self._dpdk_args = " ".join( 134 ( 135 f"-D{dpdk_arg_name}={dpdk_arg_value}" 136 for dpdk_arg_name, dpdk_arg_value in dpdk_args.items() 137 ) 138 ) 139 140 def __str__(self) -> str: 141 """The actual args.""" 142 return " ".join(f"{self._default_library} {self._dpdk_args}".split()) 143 144 145class _TarCompressionFormat(StrEnum): 146 """Compression formats that tar can use. 147 148 Enum names are the shell compression commands 149 and Enum values are the associated file extensions. 150 """ 151 152 gzip = "gz" 153 compress = "Z" 154 bzip2 = "bz2" 155 lzip = "lz" 156 lzma = "lzma" 157 lzop = "lzo" 158 xz = "xz" 159 zstd = "zst" 160 161 162class DPDKGitTarball: 163 """Compressed tarball of DPDK from the repository. 164 165 The class supports the :class:`os.PathLike` protocol, 166 which is used to get the Path of the tarball:: 167 168 from pathlib import Path 169 tarball = DPDKGitTarball("HEAD", "output") 170 tarball_path = Path(tarball) 171 """ 172 173 _git_ref: str 174 _tar_compression_format: _TarCompressionFormat 175 _tarball_dir: Path 176 _tarball_name: str 177 _tarball_path: Path | None 178 179 def __init__( 180 self, 181 git_ref: str, 182 output_dir: str, 183 tar_compression_format: _TarCompressionFormat = _TarCompressionFormat.xz, 184 ): 185 """Create the tarball during initialization. 186 187 The DPDK version is specified with `git_ref`. The tarball will be compressed with 188 `tar_compression_format`, which must be supported by the DTS execution environment. 189 The resulting tarball will be put into `output_dir`. 190 191 Args: 192 git_ref: A git commit ID, tag ID or tree ID. 193 output_dir: The directory where to put the resulting tarball. 194 tar_compression_format: The compression format to use. 195 """ 196 self._git_ref = git_ref 197 self._tar_compression_format = tar_compression_format 198 199 self._tarball_dir = Path(output_dir, "tarball") 200 201 self._create_tarball_dir() 202 203 self._tarball_name = ( 204 f"dpdk-tarball-{self._git_ref}.tar.{self._tar_compression_format.value}" 205 ) 206 self._tarball_path = self._check_tarball_path() 207 if not self._tarball_path: 208 self._create_tarball() 209 210 def _create_tarball_dir(self) -> None: 211 os.makedirs(self._tarball_dir, exist_ok=True) 212 213 def _check_tarball_path(self) -> Path | None: 214 if self._tarball_name in os.listdir(self._tarball_dir): 215 return Path(self._tarball_dir, self._tarball_name) 216 return None 217 218 def _create_tarball(self) -> None: 219 self._tarball_path = Path(self._tarball_dir, self._tarball_name) 220 221 atexit.register(self._delete_tarball) 222 223 result = subprocess.run( 224 'git -C "$(git rev-parse --show-toplevel)" archive ' 225 f'{self._git_ref} --prefix="dpdk-tarball-{self._git_ref + os.sep}" | ' 226 f"{self._tar_compression_format} > {Path(self._tarball_path.absolute())}", 227 shell=True, 228 text=True, 229 capture_output=True, 230 ) 231 232 if result.returncode != 0: 233 raise SubprocessError( 234 f"Git archive creation failed with exit code {result.returncode}.\n" 235 f"Command: {result.args}\n" 236 f"Stdout: {result.stdout}\n" 237 f"Stderr: {result.stderr}" 238 ) 239 240 atexit.unregister(self._delete_tarball) 241 242 def _delete_tarball(self) -> None: 243 if self._tarball_path and os.path.exists(self._tarball_path): 244 os.remove(self._tarball_path) 245 246 def __fspath__(self) -> str: 247 """The os.PathLike protocol implementation.""" 248 return str(self._tarball_path) 249 250 251class PacketProtocols(Flag): 252 """Flag specifying which protocols to use for packet generation.""" 253 254 #: 255 IP = 1 256 #: 257 TCP = 2 | IP 258 #: 259 UDP = 4 | IP 260 #: 261 ALL = TCP | UDP 262 263 264def generate_random_packets( 265 number_of: int, 266 payload_size: int = 1500, 267 protocols: PacketProtocols = PacketProtocols.ALL, 268 ports_range: range = range(1024, 49152), 269 mtu: int = 1500, 270) -> list[Packet]: 271 """Generate a number of random packets. 272 273 The payload of the packets will consist of random bytes. If `payload_size` is too big, then the 274 maximum payload size allowed for the specific packet type is used. The size is calculated based 275 on the specified `mtu`, therefore it is essential that `mtu` is set correctly to match the MTU 276 of the port that will send out the generated packets. 277 278 If `protocols` has any L4 protocol enabled then all the packets are generated with any of 279 the specified L4 protocols chosen at random. If only :attr:`~PacketProtocols.IP` is set, then 280 only L3 packets are generated. 281 282 If L4 packets will be generated, then the TCP/UDP ports to be used will be chosen at random from 283 `ports_range`. 284 285 Args: 286 number_of: The number of packets to generate. 287 payload_size: The packet payload size to generate, capped based on `mtu`. 288 protocols: The protocols to use for the generated packets. 289 ports_range: The range of L4 port numbers to use. Used only if `protocols` has L4 protocols. 290 mtu: The MTU of the NIC port that will send out the generated packets. 291 292 Raises: 293 InternalError: If the `payload_size` is invalid. 294 295 Returns: 296 A list containing the randomly generated packets. 297 """ 298 if payload_size < 0: 299 raise InternalError(f"An invalid payload_size of {payload_size} was given.") 300 301 l4_factories = [] 302 if protocols & PacketProtocols.TCP: 303 l4_factories.append(TCP) 304 if protocols & PacketProtocols.UDP: 305 l4_factories.append(UDP) 306 307 def _make_packet() -> Packet: 308 packet = Ether() 309 310 if protocols & PacketProtocols.IP: 311 packet /= IP() 312 313 if len(l4_factories) > 0: 314 src_port, dst_port = random.choices(ports_range, k=2) 315 packet /= random.choice(l4_factories)(sport=src_port, dport=dst_port) 316 317 max_payload_size = mtu - len(packet) 318 usable_payload_size = payload_size if payload_size < max_payload_size else max_payload_size 319 return packet / random.randbytes(usable_payload_size) 320 321 return [_make_packet() for _ in range(number_of)] 322