1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 PANTHEON.tech s.r.o. 3 4"""CPU core representation and filtering. 5 6This module provides a unified representation of logical CPU cores along 7with filtering capabilities. 8 9When symmetric multiprocessing (SMP or multithreading) is enabled on a server, 10the physical CPU cores are split into logical CPU cores with different IDs. 11 12:class:`LogicalCoreCountFilter` filters by the number of logical cores. It's possible to specify 13the socket from which to filter the number of logical cores. It's also possible to not use all 14logical CPU cores from each physical core (e.g. only the first logical core of each physical core). 15 16:class:`LogicalCoreListFilter` filters by logical core IDs. This mostly checks that 17the logical cores are actually present on the server. 18""" 19 20import dataclasses 21from abc import ABC, abstractmethod 22from collections.abc import Iterable, ValuesView 23from dataclasses import dataclass 24 25from framework.utils import expand_range 26 27 28@dataclass(slots=True, frozen=True) 29class LogicalCore: 30 """Representation of a logical CPU core. 31 32 A physical core is represented in OS by multiple logical cores (lcores) 33 if CPU multithreading is enabled. When multithreading is disabled, their IDs are the same. 34 35 Attributes: 36 lcore: The logical core ID of a CPU core. It's the same as `core` with 37 disabled multithreading. 38 core: The physical core ID of a CPU core. 39 socket: The physical socket ID where the CPU resides. 40 node: The NUMA node ID where the CPU resides. 41 """ 42 43 lcore: int 44 core: int 45 socket: int 46 node: int 47 48 def __int__(self) -> int: 49 """The CPU is best represented by the logical core, as that's what we configure in EAL.""" 50 return self.lcore 51 52 53class LogicalCoreList: 54 r"""A unified way to store :class:`LogicalCore`\s. 55 56 Create a unified format used across the framework and allow the user to use 57 either a :class:`str` representation (using ``str(instance)`` or directly in f-strings) 58 or a :class:`list` representation (by accessing the `lcore_list` property, 59 which stores logical core IDs). 60 """ 61 62 _lcore_list: list[int] 63 _lcore_str: str 64 65 def __init__(self, lcore_list: list[int] | list[str] | list[LogicalCore] | str): 66 """Process `lcore_list`, then sort. 67 68 There are four supported logical core list formats:: 69 70 lcore_list=[LogicalCore1, LogicalCore2] # a list of LogicalCores 71 lcore_list=[0,1,2,3] # a list of int indices 72 lcore_list=['0','1','2-3'] # a list of str indices; ranges are supported 73 lcore_list='0,1,2-3' # a comma delimited str of indices; ranges are supported 74 75 Args: 76 lcore_list: Various ways to represent multiple logical cores. 77 Empty `lcore_list` is allowed. 78 """ 79 self._lcore_list = [] 80 if isinstance(lcore_list, str): 81 lcore_list = lcore_list.split(",") 82 for lcore in lcore_list: 83 if isinstance(lcore, str): 84 self._lcore_list.extend(expand_range(lcore)) 85 else: 86 self._lcore_list.append(int(lcore)) 87 88 # the input lcores may not be sorted 89 self._lcore_list.sort() 90 self._lcore_str = f'{",".join(self._get_consecutive_lcores_range(self._lcore_list))}' 91 92 @property 93 def lcore_list(self) -> list[int]: 94 """The logical core IDs.""" 95 return self._lcore_list 96 97 def _get_consecutive_lcores_range(self, lcore_ids_list: list[int]) -> list[str]: 98 formatted_core_list = [] 99 segment = lcore_ids_list[:1] 100 for lcore_id in lcore_ids_list[1:]: 101 if lcore_id - segment[-1] == 1: 102 segment.append(lcore_id) 103 else: 104 formatted_core_list.append( 105 f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}" 106 ) 107 current_core_index = lcore_ids_list.index(lcore_id) 108 formatted_core_list.extend( 109 self._get_consecutive_lcores_range(lcore_ids_list[current_core_index:]) 110 ) 111 segment.clear() 112 break 113 if len(segment) > 0: 114 formatted_core_list.append( 115 f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}" 116 ) 117 return formatted_core_list 118 119 def __str__(self) -> str: 120 """The consecutive ranges of logical core IDs.""" 121 return self._lcore_str 122 123 124@dataclasses.dataclass(slots=True, frozen=True) 125class LogicalCoreCount(object): 126 """Define the number of logical cores per physical cores per sockets.""" 127 128 #: Use this many logical cores per each physical core. 129 lcores_per_core: int = 1 130 #: Use this many physical cores per each socket. 131 cores_per_socket: int = 2 132 #: Use this many sockets. 133 socket_count: int = 1 134 #: Use exactly these sockets. This takes precedence over `socket_count`, 135 #: so when `sockets` is not :data:`None`, `socket_count` is ignored. 136 sockets: list[int] | None = None 137 138 139class LogicalCoreFilter(ABC): 140 """Common filtering class. 141 142 Each filter needs to be implemented in a subclass. This base class sorts the list of cores 143 and defines the filtering method, which must be implemented by subclasses. 144 """ 145 146 _filter_specifier: LogicalCoreCount | LogicalCoreList 147 _lcores_to_filter: list[LogicalCore] 148 149 def __init__( 150 self, 151 lcore_list: list[LogicalCore], 152 filter_specifier: LogicalCoreCount | LogicalCoreList, 153 ascending: bool = True, 154 ): 155 """Filter according to the input filter specifier. 156 157 The input `lcore_list` is copied and sorted by physical core before filtering. 158 The list is copied so that the original is left intact. 159 160 Args: 161 lcore_list: The logical CPU cores to filter. 162 filter_specifier: Filter cores from `lcore_list` according to this filter. 163 ascending: Sort cores in ascending order (lowest to highest IDs). If data:`False`, 164 sort in descending order. 165 """ 166 self._filter_specifier = filter_specifier 167 168 # sorting by core is needed in case hyperthreading is enabled 169 self._lcores_to_filter = sorted(lcore_list, key=lambda x: x.core, reverse=not ascending) 170 self.filter() 171 172 @abstractmethod 173 def filter(self) -> list[LogicalCore]: 174 r"""Filter the cores. 175 176 Use `self._filter_specifier` to filter `self._lcores_to_filter` and return 177 the filtered :class:`LogicalCore`\s. 178 `self._lcores_to_filter` is a sorted copy of the original list, so it may be modified. 179 180 Returns: 181 The filtered cores. 182 """ 183 184 185class LogicalCoreCountFilter(LogicalCoreFilter): 186 """Filter cores by specified counts. 187 188 Filter the input list of LogicalCores according to specified rules: 189 190 * The input `filter_specifier` is :class:`LogicalCoreCount`, 191 * Use cores from the specified number of sockets or from the specified socket ids, 192 * If `sockets` is specified, it takes precedence over `socket_count`, 193 * From each of those sockets, use only `cores_per_socket` of cores, 194 * And for each core, use `lcores_per_core` of logical cores. Hypertheading 195 must be enabled for this to take effect. 196 """ 197 198 _filter_specifier: LogicalCoreCount 199 200 def filter(self) -> list[LogicalCore]: 201 """Filter the cores according to :class:`LogicalCoreCount`. 202 203 Start by filtering the allowed sockets. The cores matching the allowed sockets are returned. 204 The cores of each socket are stored in separate lists. 205 206 Then filter the allowed physical cores from those lists of cores per socket. When filtering 207 physical cores, store the desired number of logical cores per physical core which then 208 together constitute the final filtered list. 209 210 Returns: 211 The filtered cores. 212 """ 213 sockets_to_filter = self._filter_sockets(self._lcores_to_filter) 214 filtered_lcores = [] 215 for socket_to_filter in sockets_to_filter: 216 filtered_lcores.extend(self._filter_cores_from_socket(socket_to_filter)) 217 return filtered_lcores 218 219 def _filter_sockets( 220 self, lcores_to_filter: Iterable[LogicalCore] 221 ) -> ValuesView[list[LogicalCore]]: 222 """Filter a list of cores per each allowed socket. 223 224 The sockets may be specified in two ways, either a number or a specific list of sockets. 225 In case of a specific list, we just need to return the cores from those sockets. 226 If filtering a number of cores, we need to go through all cores and note which sockets 227 appear and only filter from the first n that appear. 228 229 Args: 230 lcores_to_filter: The cores to filter. These must be sorted by the physical core. 231 232 Returns: 233 A list of lists of logical CPU cores. Each list contains cores from one socket. 234 """ 235 allowed_sockets: set[int] = set() 236 socket_count = self._filter_specifier.socket_count 237 if self._filter_specifier.sockets: 238 # when sockets in filter is specified, the sockets are already set 239 socket_count = len(self._filter_specifier.sockets) 240 allowed_sockets = set(self._filter_specifier.sockets) 241 242 # filter socket_count sockets from all sockets by checking the socket of each CPU 243 filtered_lcores: dict[int, list[LogicalCore]] = {} 244 for lcore in lcores_to_filter: 245 if not self._filter_specifier.sockets: 246 # this is when sockets is not set, so we do the actual filtering 247 # when it is set, allowed_sockets is already defined and can't be changed 248 if len(allowed_sockets) < socket_count: 249 # allowed_sockets is a set, so adding an existing socket won't re-add it 250 allowed_sockets.add(lcore.socket) 251 if lcore.socket in allowed_sockets: 252 # separate lcores into sockets; this makes it easier in further processing 253 if lcore.socket in filtered_lcores: 254 filtered_lcores[lcore.socket].append(lcore) 255 else: 256 filtered_lcores[lcore.socket] = [lcore] 257 258 if len(allowed_sockets) < socket_count: 259 raise ValueError( 260 f"The actual number of sockets from which to use cores " 261 f"({len(allowed_sockets)}) is lower than required ({socket_count})." 262 ) 263 264 return filtered_lcores.values() 265 266 def _filter_cores_from_socket( 267 self, lcores_to_filter: Iterable[LogicalCore] 268 ) -> list[LogicalCore]: 269 """Filter a list of cores from the given socket. 270 271 Go through the cores and note how many logical cores per physical core have been filtered. 272 273 Returns: 274 The filtered logical CPU cores. 275 """ 276 # no need to use ordered dict, from Python3.7 the dict 277 # insertion order is preserved (LIFO). 278 lcore_count_per_core_map: dict[int, int] = {} 279 filtered_lcores = [] 280 for lcore in lcores_to_filter: 281 if lcore.core in lcore_count_per_core_map: 282 current_core_lcore_count = lcore_count_per_core_map[lcore.core] 283 if self._filter_specifier.lcores_per_core > current_core_lcore_count: 284 # only add lcores of the given core 285 lcore_count_per_core_map[lcore.core] += 1 286 filtered_lcores.append(lcore) 287 else: 288 # we have enough lcores per this core 289 continue 290 elif self._filter_specifier.cores_per_socket > len(lcore_count_per_core_map): 291 # only add cores if we need more 292 lcore_count_per_core_map[lcore.core] = 1 293 filtered_lcores.append(lcore) 294 else: 295 # we have enough cores 296 break 297 298 cores_per_socket = len(lcore_count_per_core_map) 299 if cores_per_socket < self._filter_specifier.cores_per_socket: 300 raise ValueError( 301 f"The actual number of cores per socket ({cores_per_socket}) " 302 f"is lower than required ({self._filter_specifier.cores_per_socket})." 303 ) 304 305 lcores_per_core = lcore_count_per_core_map[filtered_lcores[-1].core] 306 if lcores_per_core < self._filter_specifier.lcores_per_core: 307 raise ValueError( 308 f"The actual number of logical cores per core ({lcores_per_core}) " 309 f"is lower than required ({self._filter_specifier.lcores_per_core})." 310 ) 311 312 return filtered_lcores 313 314 315class LogicalCoreListFilter(LogicalCoreFilter): 316 """Filter the logical CPU cores by logical CPU core IDs. 317 318 This is a simple filter that looks at logical CPU IDs and only filter those that match. 319 320 The input filter is :class:`LogicalCoreList`. An empty LogicalCoreList won't filter anything. 321 """ 322 323 _filter_specifier: LogicalCoreList 324 325 def filter(self) -> list[LogicalCore]: 326 """Filter based on logical CPU core ID. 327 328 Return: 329 The filtered logical CPU cores. 330 """ 331 if not len(self._filter_specifier.lcore_list): 332 return self._lcores_to_filter 333 334 filtered_lcores = [] 335 for core in self._lcores_to_filter: 336 if core.lcore in self._filter_specifier.lcore_list: 337 filtered_lcores.append(core) 338 339 if len(filtered_lcores) != len(self._filter_specifier.lcore_list): 340 raise ValueError( 341 f"Not all logical cores from {self._filter_specifier.lcore_list} " 342 f"were found among {self._lcores_to_filter}" 343 ) 344 345 return filtered_lcores 346 347 348def lcore_filter( 349 core_list: list[LogicalCore], 350 filter_specifier: LogicalCoreCount | LogicalCoreList, 351 ascending: bool, 352) -> LogicalCoreFilter: 353 """Factory for providing the filter that corresponds to `filter_specifier`. 354 355 Args: 356 core_list: The logical CPU cores to filter. 357 filter_specifier: The filter to use. 358 ascending: Sort cores in ascending order (lowest to highest IDs). If :data:`False`, 359 sort in descending order. 360 361 Returns: 362 The filter that corresponds to `filter_specifier`. 363 """ 364 if isinstance(filter_specifier, LogicalCoreList): 365 return LogicalCoreListFilter(core_list, filter_specifier, ascending) 366 elif isinstance(filter_specifier, LogicalCoreCount): 367 return LogicalCoreCountFilter(core_list, filter_specifier, ascending) 368 else: 369 raise ValueError(f"Unsupported filter r{filter_specifier}") 370