1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 PANTHEON.tech s.r.o. 3# Copyright(c) 2023 University of New Hampshire 4 5"""Linux OS translator. 6 7Translate OS-unaware calls into Linux calls/utilities. Most of Linux distributions are mostly 8compliant with POSIX standards, so this module only implements the parts that aren't. 9This intermediate module implements the common parts of mostly POSIX compliant distributions. 10""" 11 12import json 13from ipaddress import IPv4Interface, IPv6Interface 14from typing import TypedDict, Union 15 16from typing_extensions import NotRequired 17 18from framework.exception import ConfigurationError, RemoteCommandExecutionError 19from framework.utils import expand_range 20 21from .cpu import LogicalCore 22from .port import Port 23from .posix_session import PosixSession 24 25 26class LshwConfigurationOutput(TypedDict): 27 """The relevant parts of ``lshw``'s ``configuration`` section.""" 28 29 #: 30 link: str 31 32 33class LshwOutput(TypedDict): 34 """A model of the relevant information from ``lshw``'s json output. 35 36 Example: 37 :: 38 39 { 40 ... 41 "businfo" : "pci@0000:08:00.0", 42 "logicalname" : "enp8s0", 43 "version" : "00", 44 "serial" : "52:54:00:59:e1:ac", 45 ... 46 "configuration" : { 47 ... 48 "link" : "yes", 49 ... 50 }, 51 ... 52 """ 53 54 #: 55 businfo: str 56 #: 57 logicalname: NotRequired[str] 58 #: 59 serial: NotRequired[str] 60 #: 61 configuration: LshwConfigurationOutput 62 63 64class LinuxSession(PosixSession): 65 """The implementation of non-Posix compliant parts of Linux.""" 66 67 @staticmethod 68 def _get_privileged_command(command: str) -> str: 69 return f"sudo -- sh -c '{command}'" 70 71 def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]: 72 """Overrides :meth:`~.os_session.OSSession.get_remote_cpus`.""" 73 cpu_info = self.send_command("lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#").stdout 74 lcores = [] 75 for cpu_line in cpu_info.splitlines(): 76 lcore, core, socket, node = map(int, cpu_line.split(",")) 77 if core == 0 and socket == 0 and not use_first_core: 78 self._logger.info("Not using the first physical core.") 79 continue 80 lcores.append(LogicalCore(lcore, core, socket, node)) 81 return lcores 82 83 def get_dpdk_file_prefix(self, dpdk_prefix: str) -> str: 84 """Overrides :meth:`~.os_session.OSSession.get_dpdk_file_prefix`.""" 85 return dpdk_prefix 86 87 def setup_hugepages(self, number_of: int, hugepage_size: int, force_first_numa: bool) -> None: 88 """Overrides :meth:`~.os_session.OSSession.setup_hugepages`.""" 89 self._logger.info("Getting Hugepage information.") 90 hugepages_total = self._get_hugepages_total(hugepage_size) 91 if ( 92 f"hugepages-{hugepage_size}kB" 93 not in self.send_command("ls /sys/kernel/mm/hugepages").stdout 94 ): 95 raise ConfigurationError("hugepage size not supported by operating system") 96 self._numa_nodes = self._get_numa_nodes() 97 98 if force_first_numa or hugepages_total < number_of: 99 # when forcing numa, we need to clear existing hugepages regardless 100 # of size, so they can be moved to the first numa node 101 self._configure_huge_pages(number_of, hugepage_size, force_first_numa) 102 else: 103 self._logger.info("Hugepages already configured.") 104 self._mount_huge_pages() 105 106 def _get_hugepages_total(self, hugepage_size: int) -> int: 107 hugepages_total = self.send_command( 108 f"cat /sys/kernel/mm/hugepages/hugepages-{hugepage_size}kB/nr_hugepages" 109 ).stdout 110 return int(hugepages_total) 111 112 def _get_numa_nodes(self) -> list[int]: 113 try: 114 numa_count = self.send_command( 115 "cat /sys/devices/system/node/online", verify=True 116 ).stdout 117 numa_range = expand_range(numa_count) 118 except RemoteCommandExecutionError: 119 # the file doesn't exist, meaning the node doesn't support numa 120 numa_range = [] 121 return numa_range 122 123 def _mount_huge_pages(self) -> None: 124 self._logger.info("Re-mounting Hugepages.") 125 hugapge_fs_cmd = "awk '/hugetlbfs/ { print $2 }' /proc/mounts" 126 self.send_command(f"umount $({hugapge_fs_cmd})", privileged=True) 127 result = self.send_command(hugapge_fs_cmd) 128 if result.stdout == "": 129 remote_mount_path = "/mnt/huge" 130 self.send_command(f"mkdir -p {remote_mount_path}", privileged=True) 131 self.send_command(f"mount -t hugetlbfs nodev {remote_mount_path}", privileged=True) 132 133 def _supports_numa(self) -> bool: 134 # the system supports numa if self._numa_nodes is non-empty and there are more 135 # than one numa node (in the latter case it may actually support numa, but 136 # there's no reason to do any numa specific configuration) 137 return len(self._numa_nodes) > 1 138 139 def _configure_huge_pages(self, number_of: int, size: int, force_first_numa: bool) -> None: 140 self._logger.info("Configuring Hugepages.") 141 hugepage_config_path = f"/sys/kernel/mm/hugepages/hugepages-{size}kB/nr_hugepages" 142 if force_first_numa and self._supports_numa(): 143 # clear non-numa hugepages 144 self.send_command(f"echo 0 | tee {hugepage_config_path}", privileged=True) 145 hugepage_config_path = ( 146 f"/sys/devices/system/node/node{self._numa_nodes[0]}/hugepages" 147 f"/hugepages-{size}kB/nr_hugepages" 148 ) 149 150 self.send_command(f"echo {number_of} | tee {hugepage_config_path}", privileged=True) 151 152 def update_ports(self, ports: list[Port]) -> None: 153 """Overrides :meth:`~.os_session.OSSession.update_ports`.""" 154 self._logger.debug("Gathering port info.") 155 for port in ports: 156 assert port.node == self.name, "Attempted to gather port info on the wrong node" 157 158 port_info_list = self._get_lshw_info() 159 for port in ports: 160 for port_info in port_info_list: 161 if f"pci@{port.pci}" == port_info.get("businfo"): 162 self._update_port_attr(port, port_info.get("logicalname"), "logical_name") 163 self._update_port_attr(port, port_info.get("serial"), "mac_address") 164 port_info_list.remove(port_info) 165 break 166 else: 167 self._logger.warning(f"No port at pci address {port.pci} found.") 168 169 def _get_lshw_info(self) -> list[LshwOutput]: 170 output = self.send_command("lshw -quiet -json -C network", verify=True) 171 return json.loads(output.stdout) 172 173 def _update_port_attr(self, port: Port, attr_value: str | None, attr_name: str) -> None: 174 if attr_value: 175 setattr(port, attr_name, attr_value) 176 self._logger.debug(f"Found '{attr_name}' of port {port.pci}: '{attr_value}'.") 177 else: 178 self._logger.warning( 179 f"Attempted to get '{attr_name}' of port {port.pci}, but it doesn't exist." 180 ) 181 182 def configure_port_state(self, port: Port, enable: bool) -> None: 183 """Overrides :meth:`~.os_session.OSSession.configure_port_state`.""" 184 state = "up" if enable else "down" 185 self.send_command(f"ip link set dev {port.logical_name} {state}", privileged=True) 186 187 def configure_port_ip_address( 188 self, 189 address: Union[IPv4Interface, IPv6Interface], 190 port: Port, 191 delete: bool, 192 ) -> None: 193 """Overrides :meth:`~.os_session.OSSession.configure_port_ip_address`.""" 194 command = "del" if delete else "add" 195 self.send_command( 196 f"ip address {command} {address} dev {port.logical_name}", 197 privileged=True, 198 verify=True, 199 ) 200 201 def configure_port_mtu(self, mtu: int, port: Port) -> None: 202 """Overrides :meth:`~.os_session.OSSession.configure_port_mtu`.""" 203 self.send_command( 204 f"ip link set dev {port.logical_name} mtu {mtu}", 205 privileged=True, 206 verify=True, 207 ) 208 209 def configure_ipv4_forwarding(self, enable: bool) -> None: 210 """Overrides :meth:`~.os_session.OSSession.configure_ipv4_forwarding`.""" 211 state = 1 if enable else 0 212 self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True) 213