1# SPDX-License-Identifier: BSD-3-Clause 2# Copyright(c) 2023 PANTHEON.tech s.r.o. 3# Copyright(c) 2023 University of New Hampshire 4 5import json 6from ipaddress import IPv4Interface, IPv6Interface 7from typing import TypedDict, Union 8 9from typing_extensions import NotRequired 10 11from framework.exception import RemoteCommandExecutionError 12from framework.utils import expand_range 13 14from .cpu import LogicalCore 15from .port import Port 16from .posix_session import PosixSession 17 18 19class LshwConfigurationOutput(TypedDict): 20 link: str 21 22 23class LshwOutput(TypedDict): 24 """ 25 A model of the relevant information from json lshw output, e.g.: 26 { 27 ... 28 "businfo" : "pci@0000:08:00.0", 29 "logicalname" : "enp8s0", 30 "version" : "00", 31 "serial" : "52:54:00:59:e1:ac", 32 ... 33 "configuration" : { 34 ... 35 "link" : "yes", 36 ... 37 }, 38 ... 39 """ 40 41 businfo: str 42 logicalname: NotRequired[str] 43 serial: NotRequired[str] 44 configuration: LshwConfigurationOutput 45 46 47class LinuxSession(PosixSession): 48 """ 49 The implementation of non-Posix compliant parts of Linux remote sessions. 50 """ 51 52 @staticmethod 53 def _get_privileged_command(command: str) -> str: 54 return f"sudo -- sh -c '{command}'" 55 56 def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]: 57 cpu_info = self.send_command("lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#").stdout 58 lcores = [] 59 for cpu_line in cpu_info.splitlines(): 60 lcore, core, socket, node = map(int, cpu_line.split(",")) 61 if core == 0 and socket == 0 and not use_first_core: 62 self._logger.info("Not using the first physical core.") 63 continue 64 lcores.append(LogicalCore(lcore, core, socket, node)) 65 return lcores 66 67 def get_dpdk_file_prefix(self, dpdk_prefix: str) -> str: 68 return dpdk_prefix 69 70 def setup_hugepages(self, hugepage_amount: int, force_first_numa: bool) -> None: 71 self._logger.info("Getting Hugepage information.") 72 hugepage_size = self._get_hugepage_size() 73 hugepages_total = self._get_hugepages_total() 74 self._numa_nodes = self._get_numa_nodes() 75 76 if force_first_numa or hugepages_total != hugepage_amount: 77 # when forcing numa, we need to clear existing hugepages regardless 78 # of size, so they can be moved to the first numa node 79 self._configure_huge_pages(hugepage_amount, hugepage_size, force_first_numa) 80 else: 81 self._logger.info("Hugepages already configured.") 82 self._mount_huge_pages() 83 84 def _get_hugepage_size(self) -> int: 85 hugepage_size = self.send_command("awk '/Hugepagesize/ {print $2}' /proc/meminfo").stdout 86 return int(hugepage_size) 87 88 def _get_hugepages_total(self) -> int: 89 hugepages_total = self.send_command( 90 "awk '/HugePages_Total/ { print $2 }' /proc/meminfo" 91 ).stdout 92 return int(hugepages_total) 93 94 def _get_numa_nodes(self) -> list[int]: 95 try: 96 numa_count = self.send_command( 97 "cat /sys/devices/system/node/online", verify=True 98 ).stdout 99 numa_range = expand_range(numa_count) 100 except RemoteCommandExecutionError: 101 # the file doesn't exist, meaning the node doesn't support numa 102 numa_range = [] 103 return numa_range 104 105 def _mount_huge_pages(self) -> None: 106 self._logger.info("Re-mounting Hugepages.") 107 hugapge_fs_cmd = "awk '/hugetlbfs/ { print $2 }' /proc/mounts" 108 self.send_command(f"umount $({hugapge_fs_cmd})") 109 result = self.send_command(hugapge_fs_cmd) 110 if result.stdout == "": 111 remote_mount_path = "/mnt/huge" 112 self.send_command(f"mkdir -p {remote_mount_path}") 113 self.send_command(f"mount -t hugetlbfs nodev {remote_mount_path}") 114 115 def _supports_numa(self) -> bool: 116 # the system supports numa if self._numa_nodes is non-empty and there are more 117 # than one numa node (in the latter case it may actually support numa, but 118 # there's no reason to do any numa specific configuration) 119 return len(self._numa_nodes) > 1 120 121 def _configure_huge_pages(self, amount: int, size: int, force_first_numa: bool) -> None: 122 self._logger.info("Configuring Hugepages.") 123 hugepage_config_path = f"/sys/kernel/mm/hugepages/hugepages-{size}kB/nr_hugepages" 124 if force_first_numa and self._supports_numa(): 125 # clear non-numa hugepages 126 self.send_command(f"echo 0 | tee {hugepage_config_path}", privileged=True) 127 hugepage_config_path = ( 128 f"/sys/devices/system/node/node{self._numa_nodes[0]}/hugepages" 129 f"/hugepages-{size}kB/nr_hugepages" 130 ) 131 132 self.send_command(f"echo {amount} | tee {hugepage_config_path}", privileged=True) 133 134 def update_ports(self, ports: list[Port]) -> None: 135 self._logger.debug("Gathering port info.") 136 for port in ports: 137 assert port.node == self.name, "Attempted to gather port info on the wrong node" 138 139 port_info_list = self._get_lshw_info() 140 for port in ports: 141 for port_info in port_info_list: 142 if f"pci@{port.pci}" == port_info.get("businfo"): 143 self._update_port_attr(port, port_info.get("logicalname"), "logical_name") 144 self._update_port_attr(port, port_info.get("serial"), "mac_address") 145 port_info_list.remove(port_info) 146 break 147 else: 148 self._logger.warning(f"No port at pci address {port.pci} found.") 149 150 def _get_lshw_info(self) -> list[LshwOutput]: 151 output = self.send_command("lshw -quiet -json -C network", verify=True) 152 return json.loads(output.stdout) 153 154 def _update_port_attr(self, port: Port, attr_value: str | None, attr_name: str) -> None: 155 if attr_value: 156 setattr(port, attr_name, attr_value) 157 self._logger.debug(f"Found '{attr_name}' of port {port.pci}: '{attr_value}'.") 158 else: 159 self._logger.warning( 160 f"Attempted to get '{attr_name}' of port {port.pci}, but it doesn't exist." 161 ) 162 163 def configure_port_state(self, port: Port, enable: bool) -> None: 164 state = "up" if enable else "down" 165 self.send_command(f"ip link set dev {port.logical_name} {state}", privileged=True) 166 167 def configure_port_ip_address( 168 self, 169 address: Union[IPv4Interface, IPv6Interface], 170 port: Port, 171 delete: bool, 172 ) -> None: 173 command = "del" if delete else "add" 174 self.send_command( 175 f"ip address {command} {address} dev {port.logical_name}", 176 privileged=True, 177 verify=True, 178 ) 179 180 def configure_ipv4_forwarding(self, enable: bool) -> None: 181 state = 1 if enable else 0 182 self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True) 183