xref: /dpdk/dts/framework/testbed_model/linux_session.py (revision 840b1e01af3391ecda5124322109f2d34b7d6d86)
1# SPDX-License-Identifier: BSD-3-Clause
2# Copyright(c) 2023 PANTHEON.tech s.r.o.
3# Copyright(c) 2023 University of New Hampshire
4
5import json
6from ipaddress import IPv4Interface, IPv6Interface
7from typing import TypedDict, Union
8
9from typing_extensions import NotRequired
10
11from framework.exception import RemoteCommandExecutionError
12from framework.utils import expand_range
13
14from .cpu import LogicalCore
15from .port import Port
16from .posix_session import PosixSession
17
18
19class LshwConfigurationOutput(TypedDict):
20    link: str
21
22
23class LshwOutput(TypedDict):
24    """
25    A model of the relevant information from json lshw output, e.g.:
26    {
27    ...
28    "businfo" : "pci@0000:08:00.0",
29    "logicalname" : "enp8s0",
30    "version" : "00",
31    "serial" : "52:54:00:59:e1:ac",
32    ...
33    "configuration" : {
34      ...
35      "link" : "yes",
36      ...
37    },
38    ...
39    """
40
41    businfo: str
42    logicalname: NotRequired[str]
43    serial: NotRequired[str]
44    configuration: LshwConfigurationOutput
45
46
47class LinuxSession(PosixSession):
48    """
49    The implementation of non-Posix compliant parts of Linux remote sessions.
50    """
51
52    @staticmethod
53    def _get_privileged_command(command: str) -> str:
54        return f"sudo -- sh -c '{command}'"
55
56    def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]:
57        cpu_info = self.send_command("lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#").stdout
58        lcores = []
59        for cpu_line in cpu_info.splitlines():
60            lcore, core, socket, node = map(int, cpu_line.split(","))
61            if core == 0 and socket == 0 and not use_first_core:
62                self._logger.info("Not using the first physical core.")
63                continue
64            lcores.append(LogicalCore(lcore, core, socket, node))
65        return lcores
66
67    def get_dpdk_file_prefix(self, dpdk_prefix: str) -> str:
68        return dpdk_prefix
69
70    def setup_hugepages(self, hugepage_amount: int, force_first_numa: bool) -> None:
71        self._logger.info("Getting Hugepage information.")
72        hugepage_size = self._get_hugepage_size()
73        hugepages_total = self._get_hugepages_total()
74        self._numa_nodes = self._get_numa_nodes()
75
76        if force_first_numa or hugepages_total != hugepage_amount:
77            # when forcing numa, we need to clear existing hugepages regardless
78            # of size, so they can be moved to the first numa node
79            self._configure_huge_pages(hugepage_amount, hugepage_size, force_first_numa)
80        else:
81            self._logger.info("Hugepages already configured.")
82        self._mount_huge_pages()
83
84    def _get_hugepage_size(self) -> int:
85        hugepage_size = self.send_command("awk '/Hugepagesize/ {print $2}' /proc/meminfo").stdout
86        return int(hugepage_size)
87
88    def _get_hugepages_total(self) -> int:
89        hugepages_total = self.send_command(
90            "awk '/HugePages_Total/ { print $2 }' /proc/meminfo"
91        ).stdout
92        return int(hugepages_total)
93
94    def _get_numa_nodes(self) -> list[int]:
95        try:
96            numa_count = self.send_command(
97                "cat /sys/devices/system/node/online", verify=True
98            ).stdout
99            numa_range = expand_range(numa_count)
100        except RemoteCommandExecutionError:
101            # the file doesn't exist, meaning the node doesn't support numa
102            numa_range = []
103        return numa_range
104
105    def _mount_huge_pages(self) -> None:
106        self._logger.info("Re-mounting Hugepages.")
107        hugapge_fs_cmd = "awk '/hugetlbfs/ { print $2 }' /proc/mounts"
108        self.send_command(f"umount $({hugapge_fs_cmd})")
109        result = self.send_command(hugapge_fs_cmd)
110        if result.stdout == "":
111            remote_mount_path = "/mnt/huge"
112            self.send_command(f"mkdir -p {remote_mount_path}")
113            self.send_command(f"mount -t hugetlbfs nodev {remote_mount_path}")
114
115    def _supports_numa(self) -> bool:
116        # the system supports numa if self._numa_nodes is non-empty and there are more
117        # than one numa node (in the latter case it may actually support numa, but
118        # there's no reason to do any numa specific configuration)
119        return len(self._numa_nodes) > 1
120
121    def _configure_huge_pages(self, amount: int, size: int, force_first_numa: bool) -> None:
122        self._logger.info("Configuring Hugepages.")
123        hugepage_config_path = f"/sys/kernel/mm/hugepages/hugepages-{size}kB/nr_hugepages"
124        if force_first_numa and self._supports_numa():
125            # clear non-numa hugepages
126            self.send_command(f"echo 0 | tee {hugepage_config_path}", privileged=True)
127            hugepage_config_path = (
128                f"/sys/devices/system/node/node{self._numa_nodes[0]}/hugepages"
129                f"/hugepages-{size}kB/nr_hugepages"
130            )
131
132        self.send_command(f"echo {amount} | tee {hugepage_config_path}", privileged=True)
133
134    def update_ports(self, ports: list[Port]) -> None:
135        self._logger.debug("Gathering port info.")
136        for port in ports:
137            assert port.node == self.name, "Attempted to gather port info on the wrong node"
138
139        port_info_list = self._get_lshw_info()
140        for port in ports:
141            for port_info in port_info_list:
142                if f"pci@{port.pci}" == port_info.get("businfo"):
143                    self._update_port_attr(port, port_info.get("logicalname"), "logical_name")
144                    self._update_port_attr(port, port_info.get("serial"), "mac_address")
145                    port_info_list.remove(port_info)
146                    break
147            else:
148                self._logger.warning(f"No port at pci address {port.pci} found.")
149
150    def _get_lshw_info(self) -> list[LshwOutput]:
151        output = self.send_command("lshw -quiet -json -C network", verify=True)
152        return json.loads(output.stdout)
153
154    def _update_port_attr(self, port: Port, attr_value: str | None, attr_name: str) -> None:
155        if attr_value:
156            setattr(port, attr_name, attr_value)
157            self._logger.debug(f"Found '{attr_name}' of port {port.pci}: '{attr_value}'.")
158        else:
159            self._logger.warning(
160                f"Attempted to get '{attr_name}' of port {port.pci}, but it doesn't exist."
161            )
162
163    def configure_port_state(self, port: Port, enable: bool) -> None:
164        state = "up" if enable else "down"
165        self.send_command(f"ip link set dev {port.logical_name} {state}", privileged=True)
166
167    def configure_port_ip_address(
168        self,
169        address: Union[IPv4Interface, IPv6Interface],
170        port: Port,
171        delete: bool,
172    ) -> None:
173        command = "del" if delete else "add"
174        self.send_command(
175            f"ip address {command} {address} dev {port.logical_name}",
176            privileged=True,
177            verify=True,
178        )
179
180    def configure_ipv4_forwarding(self, enable: bool) -> None:
181        state = 1 if enable else 0
182        self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True)
183