1812c4071SJuraj Linkeš# SPDX-License-Identifier: BSD-3-Clause 2812c4071SJuraj Linkeš# Copyright(c) 2010-2014 Intel Corporation 3680d8a24SJuraj Linkeš# Copyright(c) 2022-2023 PANTHEON.tech s.r.o. 4680d8a24SJuraj Linkeš# Copyright(c) 2022-2023 University of New Hampshire 5a23f2245SLuca Vizzarro# Copyright(c) 2024 Arm Limited 6812c4071SJuraj Linkeš 76ef07151SJuraj Linkeš"""Various utility classes and functions. 86ef07151SJuraj Linkeš 96ef07151SJuraj LinkešThese are used in multiple modules across the framework. They're here because 106ef07151SJuraj Linkešthey provide some non-specific functionality, greatly simplify imports or just don't 116ef07151SJuraj Linkešfit elsewhere. 126ef07151SJuraj Linkeš 136ef07151SJuraj LinkešAttributes: 146ef07151SJuraj Linkeš REGEX_FOR_PCI_ADDRESS: The regex representing a PCI address, e.g. ``0000:00:08.0``. 156ef07151SJuraj Linkeš""" 166ef07151SJuraj Linkeš 1780158fd4STomáš Ďurovecimport fnmatch 18cecfe0aaSJuraj Linkešimport json 19c9d31a7eSJuraj Linkešimport os 20f51557fbSLuca Vizzarroimport random 2180158fd4STomáš Ďurovecimport tarfile 22f51557fbSLuca Vizzarrofrom enum import Enum, Flag 23c9d31a7eSJuraj Linkešfrom pathlib import Path 2480158fd4STomáš Ďurovecfrom typing import Any, Callable 25c9d31a7eSJuraj Linkeš 26f51557fbSLuca Vizzarrofrom scapy.layers.inet import IP, TCP, UDP, Ether # type: ignore[import-untyped] 27282688eaSLuca Vizzarrofrom scapy.packet import Packet # type: ignore[import-untyped] 28cecfe0aaSJuraj Linkeš 29187a9447STomáš Ďurovecfrom .exception import InternalError 30c9d31a7eSJuraj Linkeš 31*b935bdc3SLuca VizzarroREGEX_FOR_PCI_ADDRESS: str = r"[0-9a-fA-F]{4}:[0-9a-fA-F]{2}:[0-9a-fA-F]{2}.[0-9]{1}" 32a91d5f47SJeremy Spewock_REGEX_FOR_COLON_OR_HYPHEN_SEP_MAC: str = r"(?:[\da-fA-F]{2}[:-]){5}[\da-fA-F]{2}" 33a91d5f47SJeremy Spewock_REGEX_FOR_DOT_SEP_MAC: str = r"(?:[\da-fA-F]{4}.){2}[\da-fA-F]{4}" 34a91d5f47SJeremy SpewockREGEX_FOR_MAC_ADDRESS: str = rf"{_REGEX_FOR_COLON_OR_HYPHEN_SEP_MAC}|{_REGEX_FOR_DOT_SEP_MAC}" 3599740300SJeremy SpewockREGEX_FOR_BASE64_ENCODING: str = "[-a-zA-Z0-9+\\/]*={0,3}" 3657c58bf8SJuraj Linkeš 37812c4071SJuraj Linkeš 38c020b7ceSJuraj Linkešdef expand_range(range_str: str) -> list[int]: 396ef07151SJuraj Linkeš """Process `range_str` into a list of integers. 40c020b7ceSJuraj Linkeš 416ef07151SJuraj Linkeš There are two possible formats of `range_str`: 426ef07151SJuraj Linkeš 436ef07151SJuraj Linkeš * ``n`` - a single integer, 446ef07151SJuraj Linkeš * ``n-m`` - a range of integers. 456ef07151SJuraj Linkeš 466ef07151SJuraj Linkeš The returned range includes both ``n`` and ``m``. Empty string returns an empty list. 476ef07151SJuraj Linkeš 486ef07151SJuraj Linkeš Args: 496ef07151SJuraj Linkeš range_str: The range to expand. 506ef07151SJuraj Linkeš 516ef07151SJuraj Linkeš Returns: 526ef07151SJuraj Linkeš All the numbers from the range. 53c020b7ceSJuraj Linkeš """ 54c020b7ceSJuraj Linkeš expanded_range: list[int] = [] 55c020b7ceSJuraj Linkeš if range_str: 56c020b7ceSJuraj Linkeš range_boundaries = range_str.split("-") 57c020b7ceSJuraj Linkeš # will throw an exception when items in range_boundaries can't be converted, 58c020b7ceSJuraj Linkeš # serving as type check 59517b4b26SJuraj Linkeš expanded_range.extend(range(int(range_boundaries[0]), int(range_boundaries[-1]) + 1)) 60c020b7ceSJuraj Linkeš 61c020b7ceSJuraj Linkeš return expanded_range 62c020b7ceSJuraj Linkeš 63c020b7ceSJuraj Linkeš 64840b1e01SJuraj Linkešdef get_packet_summaries(packets: list[Packet]) -> str: 656ef07151SJuraj Linkeš """Format a string summary from `packets`. 666ef07151SJuraj Linkeš 676ef07151SJuraj Linkeš Args: 686ef07151SJuraj Linkeš packets: The packets to format. 696ef07151SJuraj Linkeš 706ef07151SJuraj Linkeš Returns: 716ef07151SJuraj Linkeš The summary of `packets`. 726ef07151SJuraj Linkeš """ 73cecfe0aaSJuraj Linkeš if len(packets) == 1: 74cecfe0aaSJuraj Linkeš packet_summaries = packets[0].summary() 75cecfe0aaSJuraj Linkeš else: 76517b4b26SJuraj Linkeš packet_summaries = json.dumps(list(map(lambda pkt: pkt.summary(), packets)), indent=4) 77cecfe0aaSJuraj Linkeš return f"Packet contents: \n{packet_summaries}" 78cecfe0aaSJuraj Linkeš 79cecfe0aaSJuraj Linkeš 80840b1e01SJuraj Linkešclass StrEnum(Enum): 816ef07151SJuraj Linkeš """Enum with members stored as strings.""" 826ef07151SJuraj Linkeš 83840b1e01SJuraj Linkeš @staticmethod 84840b1e01SJuraj Linkeš def _generate_next_value_(name: str, start: int, count: int, last_values: object) -> str: 85840b1e01SJuraj Linkeš return name 86840b1e01SJuraj Linkeš 87840b1e01SJuraj Linkeš def __str__(self) -> str: 886ef07151SJuraj Linkeš """The string representation is the name of the member.""" 89840b1e01SJuraj Linkeš return self.name 90680d8a24SJuraj Linkeš 91680d8a24SJuraj Linkeš 923e967643SJuraj Linkešclass MesonArgs: 936ef07151SJuraj Linkeš """Aggregate the arguments needed to build DPDK.""" 94680d8a24SJuraj Linkeš 95680d8a24SJuraj Linkeš _default_library: str 96680d8a24SJuraj Linkeš 97680d8a24SJuraj Linkeš def __init__(self, default_library: str | None = None, **dpdk_args: str | bool): 986ef07151SJuraj Linkeš """Initialize the meson arguments. 996ef07151SJuraj Linkeš 1006ef07151SJuraj Linkeš Args: 1016ef07151SJuraj Linkeš default_library: The default library type, Meson supports ``shared``, ``static`` and 1026ef07151SJuraj Linkeš ``both``. Defaults to :data:`None`, in which case the argument won't be used. 1036ef07151SJuraj Linkeš dpdk_args: The arguments found in ``meson_options.txt`` in root DPDK directory. 1046ef07151SJuraj Linkeš Do not use ``-D`` with them. 1056ef07151SJuraj Linkeš 1066ef07151SJuraj Linkeš Example: 1076ef07151SJuraj Linkeš :: 1086ef07151SJuraj Linkeš 1096ef07151SJuraj Linkeš meson_args = MesonArgs(enable_kmods=True). 1106ef07151SJuraj Linkeš """ 111517b4b26SJuraj Linkeš self._default_library = f"--default-library={default_library}" if default_library else "" 112680d8a24SJuraj Linkeš self._dpdk_args = " ".join( 113680d8a24SJuraj Linkeš ( 114680d8a24SJuraj Linkeš f"-D{dpdk_arg_name}={dpdk_arg_value}" 115680d8a24SJuraj Linkeš for dpdk_arg_name, dpdk_arg_value in dpdk_args.items() 116680d8a24SJuraj Linkeš ) 117680d8a24SJuraj Linkeš ) 118680d8a24SJuraj Linkeš 119680d8a24SJuraj Linkeš def __str__(self) -> str: 1206ef07151SJuraj Linkeš """The actual args.""" 121680d8a24SJuraj Linkeš return " ".join(f"{self._default_library} {self._dpdk_args}".split()) 122c9d31a7eSJuraj Linkeš 123c9d31a7eSJuraj Linkeš 12480158fd4STomáš Ďurovecclass TarCompressionFormat(StrEnum): 125c9d31a7eSJuraj Linkeš """Compression formats that tar can use. 126c9d31a7eSJuraj Linkeš 127c9d31a7eSJuraj Linkeš Enum names are the shell compression commands 128c9d31a7eSJuraj Linkeš and Enum values are the associated file extensions. 12980158fd4STomáš Ďurovec 13080158fd4STomáš Ďurovec The 'none' member represents no compression, only archiving with tar. 13180158fd4STomáš Ďurovec Its value is set to 'tar' to indicate that the file is an uncompressed tar archive. 132c9d31a7eSJuraj Linkeš """ 133c9d31a7eSJuraj Linkeš 13480158fd4STomáš Ďurovec none = "tar" 135c9d31a7eSJuraj Linkeš gzip = "gz" 136c9d31a7eSJuraj Linkeš compress = "Z" 137c9d31a7eSJuraj Linkeš bzip2 = "bz2" 138c9d31a7eSJuraj Linkeš lzip = "lz" 139c9d31a7eSJuraj Linkeš lzma = "lzma" 140c9d31a7eSJuraj Linkeš lzop = "lzo" 141c9d31a7eSJuraj Linkeš xz = "xz" 142c9d31a7eSJuraj Linkeš zstd = "zst" 143c9d31a7eSJuraj Linkeš 14480158fd4STomáš Ďurovec @property 14580158fd4STomáš Ďurovec def extension(self): 14680158fd4STomáš Ďurovec """Return the extension associated with the compression format. 14780158fd4STomáš Ďurovec 14880158fd4STomáš Ďurovec If the compression format is 'none', the extension will be in the format 'tar'. 14980158fd4STomáš Ďurovec For other compression formats, the extension will be in the format 15080158fd4STomáš Ďurovec 'tar.{compression format}'. 15180158fd4STomáš Ďurovec """ 15280158fd4STomáš Ďurovec return f"{self.value}" if self == self.none else f"{self.none.value}.{self.value}" 15380158fd4STomáš Ďurovec 154c9d31a7eSJuraj Linkeš 15580158fd4STomáš Ďurovecdef convert_to_list_of_string(value: Any | list[Any]) -> list[str]: 15680158fd4STomáš Ďurovec """Convert the input to the list of strings.""" 15780158fd4STomáš Ďurovec return list(map(str, value) if isinstance(value, list) else str(value)) 15880158fd4STomáš Ďurovec 15980158fd4STomáš Ďurovec 16080158fd4STomáš Ďurovecdef create_tarball( 16180158fd4STomáš Ďurovec dir_path: Path, 16280158fd4STomáš Ďurovec compress_format: TarCompressionFormat = TarCompressionFormat.none, 16380158fd4STomáš Ďurovec exclude: Any | list[Any] | None = None, 16480158fd4STomáš Ďurovec) -> Path: 16580158fd4STomáš Ďurovec """Create a tarball from the contents of the specified directory. 16680158fd4STomáš Ďurovec 16780158fd4STomáš Ďurovec This method creates a tarball containing all files and directories within `dir_path`. 16880158fd4STomáš Ďurovec The tarball will be saved in the directory of `dir_path` and will be named based on `dir_path`. 16980158fd4STomáš Ďurovec 17080158fd4STomáš Ďurovec Args: 17180158fd4STomáš Ďurovec dir_path: The directory path. 17280158fd4STomáš Ďurovec compress_format: The compression format to use. Defaults to no compression. 17380158fd4STomáš Ďurovec exclude: Patterns for files or directories to exclude from the tarball. 17480158fd4STomáš Ďurovec These patterns are used with `fnmatch.fnmatch` to filter out files. 17580158fd4STomáš Ďurovec 17680158fd4STomáš Ďurovec Returns: 17780158fd4STomáš Ďurovec The path to the created tarball. 17880158fd4STomáš Ďurovec """ 17980158fd4STomáš Ďurovec 18080158fd4STomáš Ďurovec def create_filter_function(exclude_patterns: str | list[str] | None) -> Callable | None: 18180158fd4STomáš Ďurovec """Create a filter function based on the provided exclude patterns. 18280158fd4STomáš Ďurovec 18380158fd4STomáš Ďurovec Args: 18480158fd4STomáš Ďurovec exclude_patterns: Patterns for files or directories to exclude from the tarball. 18580158fd4STomáš Ďurovec These patterns are used with `fnmatch.fnmatch` to filter out files. 18680158fd4STomáš Ďurovec 18780158fd4STomáš Ďurovec Returns: 18880158fd4STomáš Ďurovec The filter function that excludes files based on the patterns. 18980158fd4STomáš Ďurovec """ 19080158fd4STomáš Ďurovec if exclude_patterns: 19180158fd4STomáš Ďurovec exclude_patterns = convert_to_list_of_string(exclude_patterns) 19280158fd4STomáš Ďurovec 19380158fd4STomáš Ďurovec def filter_func(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo | None: 19480158fd4STomáš Ďurovec file_name = os.path.basename(tarinfo.name) 19580158fd4STomáš Ďurovec if any(fnmatch.fnmatch(file_name, pattern) for pattern in exclude_patterns): 19680158fd4STomáš Ďurovec return None 19780158fd4STomáš Ďurovec return tarinfo 19880158fd4STomáš Ďurovec 19980158fd4STomáš Ďurovec return filter_func 20080158fd4STomáš Ďurovec return None 20180158fd4STomáš Ďurovec 20280158fd4STomáš Ďurovec target_tarball_path = dir_path.with_suffix(f".{compress_format.extension}") 20380158fd4STomáš Ďurovec with tarfile.open(target_tarball_path, f"w:{compress_format.value}") as tar: 20480158fd4STomáš Ďurovec tar.add(dir_path, arcname=dir_path.name, filter=create_filter_function(exclude)) 20580158fd4STomáš Ďurovec 20680158fd4STomáš Ďurovec return target_tarball_path 20780158fd4STomáš Ďurovec 20880158fd4STomáš Ďurovec 20980158fd4STomáš Ďurovecdef extract_tarball(tar_path: str | Path): 21080158fd4STomáš Ďurovec """Extract the contents of a tarball. 21180158fd4STomáš Ďurovec 21280158fd4STomáš Ďurovec The tarball will be extracted in the same path as `tar_path` parent path. 21380158fd4STomáš Ďurovec 21480158fd4STomáš Ďurovec Args: 21580158fd4STomáš Ďurovec tar_path: The path to the tarball file to extract. 21680158fd4STomáš Ďurovec """ 21780158fd4STomáš Ďurovec with tarfile.open(tar_path, "r") as tar: 21880158fd4STomáš Ďurovec tar.extractall(path=Path(tar_path).parent) 21980158fd4STomáš Ďurovec 22080158fd4STomáš Ďurovec 221f51557fbSLuca Vizzarroclass PacketProtocols(Flag): 222f51557fbSLuca Vizzarro """Flag specifying which protocols to use for packet generation.""" 223f51557fbSLuca Vizzarro 224f51557fbSLuca Vizzarro #: 225f51557fbSLuca Vizzarro IP = 1 226f51557fbSLuca Vizzarro #: 227f51557fbSLuca Vizzarro TCP = 2 | IP 228f51557fbSLuca Vizzarro #: 229f51557fbSLuca Vizzarro UDP = 4 | IP 230f51557fbSLuca Vizzarro #: 231f51557fbSLuca Vizzarro ALL = TCP | UDP 232f51557fbSLuca Vizzarro 233f51557fbSLuca Vizzarro 234f51557fbSLuca Vizzarrodef generate_random_packets( 235f51557fbSLuca Vizzarro number_of: int, 236f51557fbSLuca Vizzarro payload_size: int = 1500, 237f51557fbSLuca Vizzarro protocols: PacketProtocols = PacketProtocols.ALL, 238f51557fbSLuca Vizzarro ports_range: range = range(1024, 49152), 239f51557fbSLuca Vizzarro mtu: int = 1500, 240f51557fbSLuca Vizzarro) -> list[Packet]: 241f51557fbSLuca Vizzarro """Generate a number of random packets. 242f51557fbSLuca Vizzarro 243f51557fbSLuca Vizzarro The payload of the packets will consist of random bytes. If `payload_size` is too big, then the 244f51557fbSLuca Vizzarro maximum payload size allowed for the specific packet type is used. The size is calculated based 245f51557fbSLuca Vizzarro on the specified `mtu`, therefore it is essential that `mtu` is set correctly to match the MTU 246f51557fbSLuca Vizzarro of the port that will send out the generated packets. 247f51557fbSLuca Vizzarro 248f51557fbSLuca Vizzarro If `protocols` has any L4 protocol enabled then all the packets are generated with any of 249f51557fbSLuca Vizzarro the specified L4 protocols chosen at random. If only :attr:`~PacketProtocols.IP` is set, then 250f51557fbSLuca Vizzarro only L3 packets are generated. 251f51557fbSLuca Vizzarro 252f51557fbSLuca Vizzarro If L4 packets will be generated, then the TCP/UDP ports to be used will be chosen at random from 253f51557fbSLuca Vizzarro `ports_range`. 254f51557fbSLuca Vizzarro 255f51557fbSLuca Vizzarro Args: 256f51557fbSLuca Vizzarro number_of: The number of packets to generate. 257f51557fbSLuca Vizzarro payload_size: The packet payload size to generate, capped based on `mtu`. 258f51557fbSLuca Vizzarro protocols: The protocols to use for the generated packets. 259f51557fbSLuca Vizzarro ports_range: The range of L4 port numbers to use. Used only if `protocols` has L4 protocols. 260f51557fbSLuca Vizzarro mtu: The MTU of the NIC port that will send out the generated packets. 261f51557fbSLuca Vizzarro 262f51557fbSLuca Vizzarro Raises: 263f51557fbSLuca Vizzarro InternalError: If the `payload_size` is invalid. 264f51557fbSLuca Vizzarro 265f51557fbSLuca Vizzarro Returns: 266f51557fbSLuca Vizzarro A list containing the randomly generated packets. 267f51557fbSLuca Vizzarro """ 268f51557fbSLuca Vizzarro if payload_size < 0: 269f51557fbSLuca Vizzarro raise InternalError(f"An invalid payload_size of {payload_size} was given.") 270f51557fbSLuca Vizzarro 271f51557fbSLuca Vizzarro l4_factories = [] 272f51557fbSLuca Vizzarro if protocols & PacketProtocols.TCP: 273f51557fbSLuca Vizzarro l4_factories.append(TCP) 274f51557fbSLuca Vizzarro if protocols & PacketProtocols.UDP: 275f51557fbSLuca Vizzarro l4_factories.append(UDP) 276f51557fbSLuca Vizzarro 277f51557fbSLuca Vizzarro def _make_packet() -> Packet: 278f51557fbSLuca Vizzarro packet = Ether() 279f51557fbSLuca Vizzarro 280f51557fbSLuca Vizzarro if protocols & PacketProtocols.IP: 281f51557fbSLuca Vizzarro packet /= IP() 282f51557fbSLuca Vizzarro 283f51557fbSLuca Vizzarro if len(l4_factories) > 0: 284f51557fbSLuca Vizzarro src_port, dst_port = random.choices(ports_range, k=2) 285f51557fbSLuca Vizzarro packet /= random.choice(l4_factories)(sport=src_port, dport=dst_port) 286f51557fbSLuca Vizzarro 287f51557fbSLuca Vizzarro max_payload_size = mtu - len(packet) 288f51557fbSLuca Vizzarro usable_payload_size = payload_size if payload_size < max_payload_size else max_payload_size 289f51557fbSLuca Vizzarro return packet / random.randbytes(usable_payload_size) 290f51557fbSLuca Vizzarro 291f51557fbSLuca Vizzarro return [_make_packet() for _ in range(number_of)] 29299740300SJeremy Spewock 29399740300SJeremy Spewock 29499740300SJeremy Spewockclass MultiInheritanceBaseClass: 29599740300SJeremy Spewock """A base class for classes utilizing multiple inheritance. 29699740300SJeremy Spewock 29799740300SJeremy Spewock This class enables it's subclasses to support both single and multiple inheritance by acting as 29899740300SJeremy Spewock a stopping point in the tree of calls to the constructors of superclasses. This class is able 29999740300SJeremy Spewock to exist at the end of the Method Resolution Order (MRO) so that subclasses can call 30099740300SJeremy Spewock :meth:`super.__init__` without repercussion. 30199740300SJeremy Spewock """ 30299740300SJeremy Spewock 30399740300SJeremy Spewock def __init__(self, *args, **kwargs) -> None: 30499740300SJeremy Spewock """Call the init method of :class:`object`.""" 30599740300SJeremy Spewock super().__init__() 306c64af3c7SLuca Vizzarro 307c64af3c7SLuca Vizzarro 308c64af3c7SLuca Vizzarrodef to_pascal_case(text: str) -> str: 309c64af3c7SLuca Vizzarro """Convert `text` from snake_case to PascalCase.""" 310c64af3c7SLuca Vizzarro return "".join([seg.capitalize() for seg in text.split("_")]) 311