1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 PANTHEON.tech s.r.o. 3# Copyright(c) 2023 University of New Hampshire 4 5"""Linux OS translator. 6 7Translate OS-unaware calls into Linux calls/utilities. Most of Linux distributions are mostly 8compliant with POSIX standards, so this module only implements the parts that aren't. 9This intermediate module implements the common parts of mostly POSIX compliant distributions. 10""" 11 12import json 13from typing import TypedDict 14 15from typing_extensions import NotRequired 16 17from framework.exception import ConfigurationError, RemoteCommandExecutionError 18from framework.utils import expand_range 19 20from .cpu import LogicalCore 21from .port import Port 22from .posix_session import PosixSession 23 24 25class LshwConfigurationOutput(TypedDict): 26 """The relevant parts of ``lshw``'s ``configuration`` section.""" 27 28 #: 29 link: str 30 31 32class LshwOutput(TypedDict): 33 """A model of the relevant information from ``lshw``'s json output. 34 35 Example: 36 :: 37 38 { 39 ... 40 "businfo" : "pci@0000:08:00.0", 41 "logicalname" : "enp8s0", 42 "version" : "00", 43 "serial" : "52:54:00:59:e1:ac", 44 ... 45 "configuration" : { 46 ... 47 "link" : "yes", 48 ... 49 }, 50 ... 51 """ 52 53 #: 54 businfo: str 55 #: 56 logicalname: NotRequired[str] 57 #: 58 serial: NotRequired[str] 59 #: 60 configuration: LshwConfigurationOutput 61 62 63class LinuxSession(PosixSession): 64 """The implementation of non-Posix compliant parts of Linux.""" 65 66 @staticmethod 67 def _get_privileged_command(command: str) -> str: 68 return f"sudo -- sh -c '{command}'" 69 70 def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]: 71 """Overrides :meth:`~.os_session.OSSession.get_remote_cpus`.""" 72 cpu_info = self.send_command("lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#").stdout 73 lcores = [] 74 for cpu_line in cpu_info.splitlines(): 75 lcore, core, socket, node = map(int, cpu_line.split(",")) 76 if core == 0 and socket == 0 and not use_first_core: 77 self._logger.info("Not using the first physical core.") 78 continue 79 lcores.append(LogicalCore(lcore, core, socket, node)) 80 return lcores 81 82 def get_dpdk_file_prefix(self, dpdk_prefix: str) -> str: 83 """Overrides :meth:`~.os_session.OSSession.get_dpdk_file_prefix`.""" 84 return dpdk_prefix 85 86 def setup_hugepages(self, number_of: int, hugepage_size: int, force_first_numa: bool) -> None: 87 """Overrides :meth:`~.os_session.OSSession.setup_hugepages`.""" 88 self._logger.info("Getting Hugepage information.") 89 hugepages_total = self._get_hugepages_total(hugepage_size) 90 if ( 91 f"hugepages-{hugepage_size}kB" 92 not in self.send_command("ls /sys/kernel/mm/hugepages").stdout 93 ): 94 raise ConfigurationError("hugepage size not supported by operating system") 95 self._numa_nodes = self._get_numa_nodes() 96 97 if force_first_numa or hugepages_total < number_of: 98 # when forcing numa, we need to clear existing hugepages regardless 99 # of size, so they can be moved to the first numa node 100 self._configure_huge_pages(number_of, hugepage_size, force_first_numa) 101 else: 102 self._logger.info("Hugepages already configured.") 103 self._mount_huge_pages() 104 105 def _get_hugepages_total(self, hugepage_size: int) -> int: 106 hugepages_total = self.send_command( 107 f"cat /sys/kernel/mm/hugepages/hugepages-{hugepage_size}kB/nr_hugepages" 108 ).stdout 109 return int(hugepages_total) 110 111 def _get_numa_nodes(self) -> list[int]: 112 try: 113 numa_count = self.send_command( 114 "cat /sys/devices/system/node/online", verify=True 115 ).stdout 116 numa_range = expand_range(numa_count) 117 except RemoteCommandExecutionError: 118 # the file doesn't exist, meaning the node doesn't support numa 119 numa_range = [] 120 return numa_range 121 122 def _mount_huge_pages(self) -> None: 123 self._logger.info("Re-mounting Hugepages.") 124 hugapge_fs_cmd = "awk '/hugetlbfs/ { print $2 }' /proc/mounts" 125 self.send_command(f"umount $({hugapge_fs_cmd})", privileged=True) 126 result = self.send_command(hugapge_fs_cmd) 127 if result.stdout == "": 128 remote_mount_path = "/mnt/huge" 129 self.send_command(f"mkdir -p {remote_mount_path}", privileged=True) 130 self.send_command(f"mount -t hugetlbfs nodev {remote_mount_path}", privileged=True) 131 132 def _supports_numa(self) -> bool: 133 # the system supports numa if self._numa_nodes is non-empty and there are more 134 # than one numa node (in the latter case it may actually support numa, but 135 # there's no reason to do any numa specific configuration) 136 return len(self._numa_nodes) > 1 137 138 def _configure_huge_pages(self, number_of: int, size: int, force_first_numa: bool) -> None: 139 self._logger.info("Configuring Hugepages.") 140 hugepage_config_path = f"/sys/kernel/mm/hugepages/hugepages-{size}kB/nr_hugepages" 141 if force_first_numa and self._supports_numa(): 142 # clear non-numa hugepages 143 self.send_command(f"echo 0 | tee {hugepage_config_path}", privileged=True) 144 hugepage_config_path = ( 145 f"/sys/devices/system/node/node{self._numa_nodes[0]}/hugepages" 146 f"/hugepages-{size}kB/nr_hugepages" 147 ) 148 149 self.send_command(f"echo {number_of} | tee {hugepage_config_path}", privileged=True) 150 151 def update_ports(self, ports: list[Port]) -> None: 152 """Overrides :meth:`~.os_session.OSSession.update_ports`.""" 153 self._logger.debug("Gathering port info.") 154 for port in ports: 155 assert port.node == self.name, "Attempted to gather port info on the wrong node" 156 157 port_info_list = self._get_lshw_info() 158 for port in ports: 159 for port_info in port_info_list: 160 if f"pci@{port.pci}" == port_info.get("businfo"): 161 self._update_port_attr(port, port_info.get("logicalname"), "logical_name") 162 self._update_port_attr(port, port_info.get("serial"), "mac_address") 163 port_info_list.remove(port_info) 164 break 165 else: 166 self._logger.warning(f"No port at pci address {port.pci} found.") 167 168 def _get_lshw_info(self) -> list[LshwOutput]: 169 output = self.send_command("lshw -quiet -json -C network", verify=True) 170 return json.loads(output.stdout) 171 172 def _update_port_attr(self, port: Port, attr_value: str | None, attr_name: str) -> None: 173 if attr_value: 174 setattr(port, attr_name, attr_value) 175 self._logger.debug(f"Found '{attr_name}' of port {port.pci}: '{attr_value}'.") 176 else: 177 self._logger.warning( 178 f"Attempted to get '{attr_name}' of port {port.pci}, but it doesn't exist." 179 ) 180 181 def configure_port_mtu(self, mtu: int, port: Port) -> None: 182 """Overrides :meth:`~.os_session.OSSession.configure_port_mtu`.""" 183 self.send_command( 184 f"ip link set dev {port.logical_name} mtu {mtu}", 185 privileged=True, 186 verify=True, 187 ) 188 189 def configure_ipv4_forwarding(self, enable: bool) -> None: 190 """Overrides :meth:`~.os_session.OSSession.configure_ipv4_forwarding`.""" 191 state = 1 if enable else 0 192 self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True) 193