xref: /dpdk/dts/framework/testbed_model/cpu.py (revision 3e967643bc51c87928453e4b718a9e09d731cb21)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 PANTHEON.tech s.r.o.
3
4"""CPU core representation and filtering.
5
6This module provides a unified representation of logical CPU cores along
7with filtering capabilities.
8
9When symmetric multiprocessing (SMP or multithreading) is enabled on a server,
10the physical CPU cores are split into logical CPU cores with different IDs.
11
12:class:`LogicalCoreCountFilter` filters by the number of logical cores. It's possible to specify
13the socket from which to filter the number of logical cores. It's also possible to not use all
14logical CPU cores from each physical core (e.g. only the first logical core of each physical core).
15
16:class:`LogicalCoreListFilter` filters by logical core IDs. This mostly checks that
17the logical cores are actually present on the server.
18"""
19
20import dataclasses
21from abc import ABC, abstractmethod
22from collections.abc import Iterable, ValuesView
23from dataclasses import dataclass
24
25from framework.utils import expand_range
26
27
28@dataclass(slots=True, frozen=True)
29class LogicalCore:
30    """Representation of a logical CPU core.
31
32    A physical core is represented in OS by multiple logical cores (lcores)
33    if CPU multithreading is enabled. When multithreading is disabled, their IDs are the same.
34
35    Attributes:
36        lcore: The logical core ID of a CPU core. It's the same as `core` with
37            disabled multithreading.
38        core: The physical core ID of a CPU core.
39        socket: The physical socket ID where the CPU resides.
40        node: The NUMA node ID where the CPU resides.
41    """
42
43    lcore: int
44    core: int
45    socket: int
46    node: int
47
48    def __int__(self) -> int:
49        """The CPU is best represented by the logical core, as that's what we configure in EAL."""
50        return self.lcore
51
52
53class LogicalCoreList:
54    r"""A unified way to store :class:`LogicalCore`\s.
55
56    Create a unified format used across the framework and allow the user to use
57    either a :class:`str` representation (using ``str(instance)`` or directly in f-strings)
58    or a :class:`list` representation (by accessing the `lcore_list` property,
59    which stores logical core IDs).
60    """
61
62    _lcore_list: list[int]
63    _lcore_str: str
64
65    def __init__(self, lcore_list: list[int] | list[str] | list[LogicalCore] | str):
66        """Process `lcore_list`, then sort.
67
68        There are four supported logical core list formats::
69
70            lcore_list=[LogicalCore1, LogicalCore2]  # a list of LogicalCores
71            lcore_list=[0,1,2,3]        # a list of int indices
72            lcore_list=['0','1','2-3']  # a list of str indices; ranges are supported
73            lcore_list='0,1,2-3'        # a comma delimited str of indices; ranges are supported
74
75        Args:
76            lcore_list: Various ways to represent multiple logical cores.
77                Empty `lcore_list` is allowed.
78        """
79        self._lcore_list = []
80        if isinstance(lcore_list, str):
81            lcore_list = lcore_list.split(",")
82        for lcore in lcore_list:
83            if isinstance(lcore, str):
84                self._lcore_list.extend(expand_range(lcore))
85            else:
86                self._lcore_list.append(int(lcore))
87
88        # the input lcores may not be sorted
89        self._lcore_list.sort()
90        self._lcore_str = f'{",".join(self._get_consecutive_lcores_range(self._lcore_list))}'
91
92    @property
93    def lcore_list(self) -> list[int]:
94        """The logical core IDs."""
95        return self._lcore_list
96
97    def _get_consecutive_lcores_range(self, lcore_ids_list: list[int]) -> list[str]:
98        formatted_core_list = []
99        segment = lcore_ids_list[:1]
100        for lcore_id in lcore_ids_list[1:]:
101            if lcore_id - segment[-1] == 1:
102                segment.append(lcore_id)
103            else:
104                formatted_core_list.append(
105                    f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
106                )
107                current_core_index = lcore_ids_list.index(lcore_id)
108                formatted_core_list.extend(
109                    self._get_consecutive_lcores_range(lcore_ids_list[current_core_index:])
110                )
111                segment.clear()
112                break
113        if len(segment) > 0:
114            formatted_core_list.append(
115                f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
116            )
117        return formatted_core_list
118
119    def __str__(self) -> str:
120        """The consecutive ranges of logical core IDs."""
121        return self._lcore_str
122
123
124@dataclasses.dataclass(slots=True, frozen=True)
125class LogicalCoreCount(object):
126    """Define the number of logical cores per physical cores per sockets."""
127
128    #: Use this many logical cores per each physical core.
129    lcores_per_core: int = 1
130    #: Use this many physical cores per each socket.
131    cores_per_socket: int = 2
132    #: Use this many sockets.
133    socket_count: int = 1
134    #: Use exactly these sockets. This takes precedence over `socket_count`,
135    #: so when `sockets` is not :data:`None`, `socket_count` is ignored.
136    sockets: list[int] | None = None
137
138
139class LogicalCoreFilter(ABC):
140    """Common filtering class.
141
142    Each filter needs to be implemented in a subclass. This base class sorts the list of cores
143    and defines the filtering method, which must be implemented by subclasses.
144    """
145
146    _filter_specifier: LogicalCoreCount | LogicalCoreList
147    _lcores_to_filter: list[LogicalCore]
148
149    def __init__(
150        self,
151        lcore_list: list[LogicalCore],
152        filter_specifier: LogicalCoreCount | LogicalCoreList,
153        ascending: bool = True,
154    ):
155        """Filter according to the input filter specifier.
156
157        The input `lcore_list` is copied and sorted by physical core before filtering.
158        The list is copied so that the original is left intact.
159
160        Args:
161            lcore_list: The logical CPU cores to filter.
162            filter_specifier: Filter cores from `lcore_list` according to this filter.
163            ascending: Sort cores in ascending order (lowest to highest IDs). If data:`False`,
164                sort in descending order.
165        """
166        self._filter_specifier = filter_specifier
167
168        # sorting by core is needed in case hyperthreading is enabled
169        self._lcores_to_filter = sorted(lcore_list, key=lambda x: x.core, reverse=not ascending)
170        self.filter()
171
172    @abstractmethod
173    def filter(self) -> list[LogicalCore]:
174        r"""Filter the cores.
175
176        Use `self._filter_specifier` to filter `self._lcores_to_filter` and return
177        the filtered :class:`LogicalCore`\s.
178        `self._lcores_to_filter` is a sorted copy of the original list, so it may be modified.
179
180        Returns:
181            The filtered cores.
182        """
183
184
185class LogicalCoreCountFilter(LogicalCoreFilter):
186    """Filter cores by specified counts.
187
188    Filter the input list of LogicalCores according to specified rules:
189
190        * The input `filter_specifier` is :class:`LogicalCoreCount`,
191        * Use cores from the specified number of sockets or from the specified socket ids,
192        * If `sockets` is specified, it takes precedence over `socket_count`,
193        * From each of those sockets, use only `cores_per_socket` of cores,
194        * And for each core, use `lcores_per_core` of logical cores. Hypertheading
195          must be enabled for this to take effect.
196    """
197
198    _filter_specifier: LogicalCoreCount
199
200    def filter(self) -> list[LogicalCore]:
201        """Filter the cores according to :class:`LogicalCoreCount`.
202
203        Start by filtering the allowed sockets. The cores matching the allowed sockets are returned.
204        The cores of each socket are stored in separate lists.
205
206        Then filter the allowed physical cores from those lists of cores per socket. When filtering
207        physical cores, store the desired number of logical cores per physical core which then
208        together constitute the final filtered list.
209
210        Returns:
211            The filtered cores.
212        """
213        sockets_to_filter = self._filter_sockets(self._lcores_to_filter)
214        filtered_lcores = []
215        for socket_to_filter in sockets_to_filter:
216            filtered_lcores.extend(self._filter_cores_from_socket(socket_to_filter))
217        return filtered_lcores
218
219    def _filter_sockets(
220        self, lcores_to_filter: Iterable[LogicalCore]
221    ) -> ValuesView[list[LogicalCore]]:
222        """Filter a list of cores per each allowed socket.
223
224        The sockets may be specified in two ways, either a number or a specific list of sockets.
225        In case of a specific list, we just need to return the cores from those sockets.
226        If filtering a number of cores, we need to go through all cores and note which sockets
227        appear and only filter from the first n that appear.
228
229        Args:
230            lcores_to_filter: The cores to filter. These must be sorted by the physical core.
231
232        Returns:
233            A list of lists of logical CPU cores. Each list contains cores from one socket.
234        """
235        allowed_sockets: set[int] = set()
236        socket_count = self._filter_specifier.socket_count
237        if self._filter_specifier.sockets:
238            # when sockets in filter is specified, the sockets are already set
239            socket_count = len(self._filter_specifier.sockets)
240            allowed_sockets = set(self._filter_specifier.sockets)
241
242        # filter socket_count sockets from all sockets by checking the socket of each CPU
243        filtered_lcores: dict[int, list[LogicalCore]] = {}
244        for lcore in lcores_to_filter:
245            if not self._filter_specifier.sockets:
246                # this is when sockets is not set, so we do the actual filtering
247                # when it is set, allowed_sockets is already defined and can't be changed
248                if len(allowed_sockets) < socket_count:
249                    # allowed_sockets is a set, so adding an existing socket won't re-add it
250                    allowed_sockets.add(lcore.socket)
251            if lcore.socket in allowed_sockets:
252                # separate lcores into sockets; this makes it easier in further processing
253                if lcore.socket in filtered_lcores:
254                    filtered_lcores[lcore.socket].append(lcore)
255                else:
256                    filtered_lcores[lcore.socket] = [lcore]
257
258        if len(allowed_sockets) < socket_count:
259            raise ValueError(
260                f"The actual number of sockets from which to use cores "
261                f"({len(allowed_sockets)}) is lower than required ({socket_count})."
262            )
263
264        return filtered_lcores.values()
265
266    def _filter_cores_from_socket(
267        self, lcores_to_filter: Iterable[LogicalCore]
268    ) -> list[LogicalCore]:
269        """Filter a list of cores from the given socket.
270
271        Go through the cores and note how many logical cores per physical core have been filtered.
272
273        Returns:
274            The filtered logical CPU cores.
275        """
276        # no need to use ordered dict, from Python3.7 the dict
277        # insertion order is preserved (LIFO).
278        lcore_count_per_core_map: dict[int, int] = {}
279        filtered_lcores = []
280        for lcore in lcores_to_filter:
281            if lcore.core in lcore_count_per_core_map:
282                current_core_lcore_count = lcore_count_per_core_map[lcore.core]
283                if self._filter_specifier.lcores_per_core > current_core_lcore_count:
284                    # only add lcores of the given core
285                    lcore_count_per_core_map[lcore.core] += 1
286                    filtered_lcores.append(lcore)
287                else:
288                    # we have enough lcores per this core
289                    continue
290            elif self._filter_specifier.cores_per_socket > len(lcore_count_per_core_map):
291                # only add cores if we need more
292                lcore_count_per_core_map[lcore.core] = 1
293                filtered_lcores.append(lcore)
294            else:
295                # we have enough cores
296                break
297
298        cores_per_socket = len(lcore_count_per_core_map)
299        if cores_per_socket < self._filter_specifier.cores_per_socket:
300            raise ValueError(
301                f"The actual number of cores per socket ({cores_per_socket}) "
302                f"is lower than required ({self._filter_specifier.cores_per_socket})."
303            )
304
305        lcores_per_core = lcore_count_per_core_map[filtered_lcores[-1].core]
306        if lcores_per_core < self._filter_specifier.lcores_per_core:
307            raise ValueError(
308                f"The actual number of logical cores per core ({lcores_per_core}) "
309                f"is lower than required ({self._filter_specifier.lcores_per_core})."
310            )
311
312        return filtered_lcores
313
314
315class LogicalCoreListFilter(LogicalCoreFilter):
316    """Filter the logical CPU cores by logical CPU core IDs.
317
318    This is a simple filter that looks at logical CPU IDs and only filter those that match.
319
320    The input filter is :class:`LogicalCoreList`. An empty LogicalCoreList won't filter anything.
321    """
322
323    _filter_specifier: LogicalCoreList
324
325    def filter(self) -> list[LogicalCore]:
326        """Filter based on logical CPU core ID.
327
328        Return:
329            The filtered logical CPU cores.
330        """
331        if not len(self._filter_specifier.lcore_list):
332            return self._lcores_to_filter
333
334        filtered_lcores = []
335        for core in self._lcores_to_filter:
336            if core.lcore in self._filter_specifier.lcore_list:
337                filtered_lcores.append(core)
338
339        if len(filtered_lcores) != len(self._filter_specifier.lcore_list):
340            raise ValueError(
341                f"Not all logical cores from {self._filter_specifier.lcore_list} "
342                f"were found among {self._lcores_to_filter}"
343            )
344
345        return filtered_lcores
346
347
348def lcore_filter(
349    core_list: list[LogicalCore],
350    filter_specifier: LogicalCoreCount | LogicalCoreList,
351    ascending: bool,
352) -> LogicalCoreFilter:
353    """Factory for providing the filter that corresponds to `filter_specifier`.
354
355    Args:
356        core_list: The logical CPU cores to filter.
357        filter_specifier: The filter to use.
358        ascending: Sort cores in ascending order (lowest to highest IDs). If :data:`False`,
359            sort in descending order.
360
361    Returns:
362        The filter that corresponds to `filter_specifier`.
363    """
364    if isinstance(filter_specifier, LogicalCoreList):
365        return LogicalCoreListFilter(core_list, filter_specifier, ascending)
366    elif isinstance(filter_specifier, LogicalCoreCount):
367        return LogicalCoreCountFilter(core_list, filter_specifier, ascending)
368    else:
369        raise ValueError(f"Unsupported filter r{filter_specifier}")
370