1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import inspect 16import logging 17import zipfile 18import threading 19import subprocess 20import itertools 21import configparser 22import time 23import uuid 24 25import paramiko 26import pandas as pd 27from common import * 28from subprocess import CalledProcessError 29from abc import abstractmethod, ABC 30from dataclasses import dataclass 31from typing import Any 32 33sys.path.append(os.path.dirname(__file__) + '/../../../python') 34 35import spdk.rpc as rpc # noqa 36import spdk.rpc.client as rpc_client # noqa 37 38 39@dataclass 40class ConfigField: 41 name: str = None 42 default: Any = None 43 required: bool = False 44 45 46class Server(ABC): 47 48 RDMA_PROTOCOL_IWARP = 0 49 RDMA_PROTOCOL_ROCE = 1 50 RDMA_PROTOCOL_UNKNOWN = -1 51 52 def __init__(self, name, general_config, server_config): 53 self.name = name 54 55 self.log = logging.getLogger(self.name) 56 57 config_fields = [ 58 ConfigField(name='username', required=True), 59 ConfigField(name='password', required=True), 60 ConfigField(name='transport', required=True), 61 ConfigField(name='skip_spdk_install', default=False), 62 ConfigField(name='irdma_roce_enable', default=False), 63 ConfigField(name='pause_frames', default=None) 64 ] 65 66 self.read_config(config_fields, general_config) 67 self.transport = self.transport.lower() 68 69 config_fields = [ 70 ConfigField(name='nic_ips', required=True), 71 ConfigField(name='mode', required=True), 72 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 73 ConfigField(name='enable_arfs', default=False), 74 ConfigField(name='tuned_profile', default='') 75 ] 76 self.read_config(config_fields, server_config) 77 78 self.local_nic_info = [] 79 self._nics_json_obj = {} 80 self.svc_restore_dict = {} 81 self.sysctl_restore_dict = {} 82 self.tuned_restore_dict = {} 83 self.governor_restore = "" 84 85 self.enable_adq = server_config.get('adq_enable', False) 86 self.adq_priority = 1 if self.enable_adq else None 87 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 88 89 if not re.match(r'^[A-Za-z0-9\-]+$', name): 90 self.log.info("Please use a name which contains only letters, numbers or dashes") 91 sys.exit(1) 92 93 def read_config(self, config_fields, config): 94 for config_field in config_fields: 95 value = config.get(config_field.name, config_field.default) 96 if config_field.required and value is None: 97 raise ValueError('%s config key is required' % config_field.name) 98 99 setattr(self, config_field.name, value) 100 101 @staticmethod 102 def get_uncommented_lines(lines): 103 return [line for line in lines if line and not line.startswith('#')] 104 105 @staticmethod 106 def get_core_list_from_mask(core_mask): 107 # Generate list of individual cores from hex core mask 108 # (e.g. '0xffff') or list containing: 109 # - individual cores (e.g. '1, 2, 3') 110 # - core ranges (e.g. '0-3') 111 # - mix of both (e.g. '0, 1-4, 9, 11-13') 112 113 core_list = [] 114 if "0x" in core_mask: 115 core_mask_int = int(core_mask, 16) 116 for i in range(core_mask_int.bit_length()): 117 if (1 << i) & core_mask_int: 118 core_list.append(i) 119 return core_list 120 else: 121 # Core list can be provided in .json config with square brackets 122 # remove them first 123 core_mask = core_mask.replace("[", "") 124 core_mask = core_mask.replace("]", "") 125 126 for i in core_mask.split(","): 127 if "-" in i: 128 start, end = i.split("-") 129 core_range = range(int(start), int(end) + 1) 130 core_list.extend(core_range) 131 else: 132 core_list.append(int(i)) 133 return core_list 134 135 def get_nic_name_by_ip(self, ip): 136 if not self._nics_json_obj: 137 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 138 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 139 for nic in self._nics_json_obj: 140 for addr in nic["addr_info"]: 141 if ip in addr["local"]: 142 return nic["ifname"] 143 144 @abstractmethod 145 def set_local_nic_info_helper(self): 146 pass 147 148 def set_local_nic_info(self, pci_info): 149 def extract_network_elements(json_obj): 150 nic_list = [] 151 if isinstance(json_obj, list): 152 for x in json_obj: 153 nic_list.extend(extract_network_elements(x)) 154 elif isinstance(json_obj, dict): 155 if "children" in json_obj: 156 nic_list.extend(extract_network_elements(json_obj["children"])) 157 if "class" in json_obj.keys() and "network" in json_obj["class"]: 158 nic_list.append(json_obj) 159 return nic_list 160 161 self.local_nic_info = extract_network_elements(pci_info) 162 163 def get_nic_numa_node(self, nic_name): 164 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 165 166 def get_numa_cpu_map(self): 167 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 168 numa_cpu_json_map = {} 169 170 for cpu in numa_cpu_json_obj["cpus"]: 171 cpu_num = int(cpu["cpu"]) 172 numa_node = int(cpu["node"]) 173 numa_cpu_json_map.setdefault(numa_node, []) 174 numa_cpu_json_map[numa_node].append(cpu_num) 175 176 return numa_cpu_json_map 177 178 # pylint: disable=R0201 179 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 180 return "" 181 182 def configure_system(self): 183 self.load_drivers() 184 self.configure_services() 185 self.configure_sysctl() 186 self.configure_arfs() 187 self.configure_tuned() 188 self.configure_cpu_governor() 189 self.configure_irq_affinity(**self.irq_settings) 190 self.configure_pause_frames() 191 192 def check_rdma_protocol(self): 193 try: 194 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]).strip() 195 return self.RDMA_PROTOCOL_IWARP if roce_ena == "0" else self.RDMA_PROTOCOL_ROCE 196 except CalledProcessError as e: 197 self.log.error("ERROR: failed to check RDMA protocol!") 198 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 199 return self.RDMA_PROTOCOL_UNKNOWN 200 201 def load_drivers(self): 202 self.log.info("Loading drivers") 203 self.exec_cmd(["sudo", "modprobe", "-a", f"nvme-{self.transport}", f"nvmet-{self.transport}"]) 204 205 if self.transport != "rdma": 206 return 207 208 current_mode = self.check_rdma_protocol() 209 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 210 self.log.error("ERROR: failed to check RDMA protocol mode") 211 return 212 213 if self.irdma_roce_enable: 214 if current_mode == self.RDMA_PROTOCOL_IWARP: 215 self.reload_driver("irdma", "roce_ena=1") 216 self.log.info("Loaded irdma driver with RoCE enabled") 217 else: 218 self.log.info("Leaving irdma driver with RoCE enabled") 219 else: 220 self.reload_driver("irdma", "roce_ena=0") 221 self.log.info("Loaded irdma driver with iWARP enabled") 222 223 def set_pause_frames(self, rx_state, tx_state): 224 for nic_ip in self.nic_ips: 225 nic_name = self.get_nic_name_by_ip(nic_ip) 226 self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state]) 227 228 def configure_pause_frames(self): 229 if self.pause_frames is None: 230 self.log.info("Keeping NIC's default pause frames setting") 231 return 232 elif not self.pause_frames: 233 self.log.info("Turning off pause frames") 234 self.set_pause_frames("off", "off") 235 else: 236 self.log.info("Turning on pause frames") 237 self.set_pause_frames("on", "on") 238 239 def configure_arfs(self): 240 rps_flow_cnt = 512 if self.enable_arfs else 0 241 242 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 243 for nic_name in nic_names: 244 rps_wildcard_path = f"/sys/class/net/{nic_name}/queues/rx-*/rps_flow_cnt" 245 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 246 self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name} rx queues") 247 248 self.exec_cmd(["bash", "-c", f"echo {rps_flow_cnt} | sudo tee {rps_wildcard_path}"]) 249 250 set_values = list(set(self.exec_cmd(["sudo", "bash", "-c", f'cat {rps_wildcard_path}']).strip().split("\n"))) 251 if len(set_values) > 1: 252 self.log.error(f"""Not all NIC {nic_name} queues had rps_flow_cnt set properly. Found rps_flow_cnt values: {set_values}""") 253 elif set_values[0] != str(rps_flow_cnt): 254 self.log.error(f"""Wrong rps_flow_cnt set on {nic_name} queues. Found {set_values[0]}, should be {rps_flow_cnt}""") 255 else: 256 self.log.info(f"Configuration of {nic_name} completed with confirmed rps_flow_cnt settings.") 257 258 self.log.info("ARFS configuration completed.") 259 260 def configure_adq(self): 261 self.adq_load_modules() 262 self.adq_configure_nic() 263 264 def adq_load_modules(self): 265 self.log.info("Modprobing ADQ-related Linux modules...") 266 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 267 for module in adq_module_deps: 268 try: 269 self.exec_cmd(["sudo", "modprobe", module]) 270 self.log.info("%s loaded!" % module) 271 except CalledProcessError as e: 272 self.log.error("ERROR: failed to load module %s" % module) 273 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 274 275 def adq_configure_tc(self): 276 self.log.info("Configuring ADQ Traffic classes and filters...") 277 278 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 279 num_queues_tc1 = self.num_cores 280 port_param = "dst_port" if isinstance(self, Target) else "src_port" 281 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 282 283 for nic_ip in self.nic_ips: 284 nic_name = self.get_nic_name_by_ip(nic_ip) 285 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 286 287 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 288 "root", "mqprio", "num_tc", "2", "map", "0", "1", 289 "queues", "%s@0" % num_queues_tc0, 290 "%s@%s" % (num_queues_tc1, num_queues_tc0), 291 "hw", "1", "mode", "channel"] 292 self.log.info(" ".join(tc_qdisc_map_cmd)) 293 self.exec_cmd(tc_qdisc_map_cmd) 294 295 time.sleep(5) 296 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 297 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 298 self.exec_cmd(tc_qdisc_ingress_cmd) 299 300 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 301 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 302 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 303 304 for port in nic_ports: 305 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 306 "protocol", "ip", "ingress", "prio", "1", "flower", 307 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 308 "skip_sw", "hw_tc", "1"] 309 self.log.info(" ".join(tc_filter_cmd)) 310 self.exec_cmd(tc_filter_cmd) 311 312 # show tc configuration 313 self.log.info("Show tc configuration for %s NIC..." % nic_name) 314 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 315 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 316 self.log.info("%s" % tc_disk_out) 317 self.log.info("%s" % tc_filter_out) 318 319 # Ethtool coalesce settings must be applied after configuring traffic classes 320 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 321 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 322 323 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 324 xps_cmd = ["sudo", xps_script_path, nic_name] 325 self.log.info(xps_cmd) 326 self.exec_cmd(xps_cmd) 327 328 def reload_driver(self, driver, *modprobe_args): 329 330 try: 331 self.exec_cmd(["sudo", "rmmod", driver]) 332 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 333 except CalledProcessError as e: 334 self.log.error("ERROR: failed to reload %s module!" % driver) 335 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 336 337 def adq_configure_nic(self): 338 self.log.info("Configuring NIC port settings for ADQ testing...") 339 340 # Reload the driver first, to make sure any previous settings are re-set. 341 self.reload_driver("ice") 342 343 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 344 for nic in nic_names: 345 self.log.info(nic) 346 try: 347 self.exec_cmd(["sudo", "ethtool", "-K", nic, 348 "hw-tc-offload", "on"]) # Enable hardware TC offload 349 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 350 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 351 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 352 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 353 # but can be used with 4k block sizes as well 354 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 355 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 356 except subprocess.CalledProcessError as e: 357 self.log.error("ERROR: failed to configure NIC port using ethtool!") 358 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 359 self.log.info("Please update your NIC driver and firmware versions and try again.") 360 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 361 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 362 363 def configure_services(self): 364 self.log.info("Configuring active services...") 365 svc_config = configparser.ConfigParser(strict=False) 366 367 # Below list is valid only for RHEL / Fedora systems and might not 368 # contain valid names for other distributions. 369 svc_target_state = { 370 "firewalld": "inactive", 371 "irqbalance": "inactive", 372 "lldpad.service": "inactive", 373 "lldpad.socket": "inactive" 374 } 375 376 for service in svc_target_state: 377 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 378 out = "\n".join(["[%s]" % service, out]) 379 svc_config.read_string(out) 380 381 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 382 continue 383 384 service_state = svc_config[service]["ActiveState"] 385 self.log.info("Current state of %s service is %s" % (service, service_state)) 386 self.svc_restore_dict.update({service: service_state}) 387 if service_state != "inactive": 388 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 389 self.exec_cmd(["sudo", "systemctl", "stop", service]) 390 391 def configure_sysctl(self): 392 self.log.info("Tuning sysctl settings...") 393 394 busy_read = 0 395 busy_poll = 0 396 if self.enable_adq and self.mode == "spdk": 397 busy_read = 1 398 busy_poll = 1 399 400 sysctl_opts = { 401 "net.core.busy_poll": busy_poll, 402 "net.core.busy_read": busy_read, 403 "net.core.somaxconn": 4096, 404 "net.core.netdev_max_backlog": 8192, 405 "net.ipv4.tcp_max_syn_backlog": 16384, 406 "net.core.rmem_max": 268435456, 407 "net.core.wmem_max": 268435456, 408 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 409 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 410 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 411 "net.ipv4.route.flush": 1, 412 "vm.overcommit_memory": 1, 413 "net.core.rps_sock_flow_entries": 0 414 } 415 416 if self.enable_adq: 417 sysctl_opts.update(self.adq_set_busy_read(1)) 418 419 if self.enable_arfs: 420 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 421 422 for opt, value in sysctl_opts.items(): 423 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 424 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 425 426 def configure_tuned(self): 427 if not self.tuned_profile: 428 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 429 return 430 431 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 432 service = "tuned" 433 tuned_config = configparser.ConfigParser(strict=False) 434 435 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 436 out = "\n".join(["[%s]" % service, out]) 437 tuned_config.read_string(out) 438 tuned_state = tuned_config[service]["ActiveState"] 439 self.svc_restore_dict.update({service: tuned_state}) 440 441 if tuned_state != "inactive": 442 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 443 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 444 445 self.tuned_restore_dict = { 446 "profile": profile, 447 "mode": profile_mode 448 } 449 450 self.exec_cmd(["sudo", "systemctl", "start", service]) 451 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 452 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 453 454 def configure_cpu_governor(self): 455 self.log.info("Setting CPU governor to performance...") 456 457 # This assumes that there is the same CPU scaling governor on each CPU 458 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 459 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 460 461 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 462 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 463 464 if mode not in ["default", "bynode", "cpulist"]: 465 raise ValueError("%s irq affinity setting not supported" % mode) 466 467 if mode == "cpulist" and not cpulist: 468 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 469 470 affinity_script = "set_irq_affinity.sh" 471 if "default" not in mode: 472 affinity_script = "set_irq_affinity_cpulist.sh" 473 system_cpu_map = self.get_numa_cpu_map() 474 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 475 476 def cpu_list_to_string(cpulist): 477 return ",".join(map(lambda x: str(x), cpulist)) 478 479 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 480 for nic_name in nic_names: 481 irq_cmd = ["sudo", irq_script_path] 482 483 # Use only CPU cores matching NIC NUMA node. 484 # Remove any CPU cores if they're on exclusion list. 485 if mode == "bynode": 486 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 487 if cpulist and exclude_cpulist: 488 disallowed_cpus = self.get_core_list_from_mask(cpulist) 489 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 490 if not irq_cpus: 491 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 492 irq_cmd.append(cpu_list_to_string(irq_cpus)) 493 494 if mode == "cpulist": 495 irq_cpus = self.get_core_list_from_mask(cpulist) 496 if exclude_cpulist: 497 # Flatten system CPU list, we don't need NUMA awareness here 498 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 499 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 500 if not irq_cpus: 501 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 502 irq_cmd.append(cpu_list_to_string(irq_cpus)) 503 504 irq_cmd.append(nic_name) 505 self.log.info(irq_cmd) 506 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 507 508 def restore_services(self): 509 self.log.info("Restoring services...") 510 for service, state in self.svc_restore_dict.items(): 511 cmd = "stop" if state == "inactive" else "start" 512 self.exec_cmd(["sudo", "systemctl", cmd, service]) 513 514 def restore_sysctl(self): 515 self.log.info("Restoring sysctl settings...") 516 for opt, value in self.sysctl_restore_dict.items(): 517 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 518 519 def restore_tuned(self): 520 self.log.info("Restoring tuned-adm settings...") 521 522 if not self.tuned_restore_dict: 523 return 524 525 if self.tuned_restore_dict["mode"] == "auto": 526 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 527 self.log.info("Reverted tuned-adm to auto_profile.") 528 else: 529 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 530 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 531 532 def restore_governor(self): 533 self.log.info("Restoring CPU governor setting...") 534 if self.governor_restore: 535 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 536 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 537 538 def restore_settings(self): 539 self.restore_governor() 540 self.restore_tuned() 541 self.restore_services() 542 self.restore_sysctl() 543 if self.enable_adq: 544 self.reload_driver("ice") 545 546 def check_for_throttling(self): 547 patterns = [ 548 r"CPU\d+: Core temperature is above threshold, cpu clock is throttled", 549 r"mlx5_core \S+: poll_health:\d+:\(pid \d+\): device's health compromised", 550 r"mlx5_core \S+: print_health_info:\d+:\(pid \d+\): Health issue observed, High temperature", 551 r"mlx5_core \S+: temp_warn:\d+:\(pid \d+\): High temperature on sensors" 552 ] 553 554 issue_found = False 555 try: 556 output = self.exec_cmd(["dmesg"]) 557 558 for line in output.split("\n"): 559 for pattern in patterns: 560 if re.search(pattern, line): 561 self.log.warning("Throttling or temperature issue found") 562 issue_found = True 563 return 1 if issue_found else 0 564 except Exception as e: 565 self.log.error("ERROR: failed to execute dmesg command") 566 self.log.error(f"Error {e}") 567 return 1 568 569 def stop(self): 570 if self.check_for_throttling(): 571 err_message = (f"Throttling or temperature issue found on {self.name} system! " 572 "Check system logs!") 573 raise CpuThrottlingError(err_message) 574 575 576class Target(Server): 577 def __init__(self, name, general_config, target_config): 578 super().__init__(name, general_config, target_config) 579 580 # Defaults 581 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 582 self.subsystem_info_list = [] 583 self.initiator_info = [] 584 585 config_fields = [ 586 ConfigField(name='mode', required=True), 587 ConfigField(name='results_dir', required=True), 588 ConfigField(name='enable_pm', default=True), 589 ConfigField(name='enable_sar', default=True), 590 ConfigField(name='enable_pcm', default=True), 591 ConfigField(name='enable_dpdk_memory', default=True) 592 ] 593 self.read_config(config_fields, target_config) 594 595 self.null_block = target_config.get('null_block_devices', 0) 596 self.scheduler_name = target_config.get('scheduler_settings', 'static') 597 self.enable_zcopy = target_config.get('zcopy_settings', False) 598 self.enable_bw = target_config.get('enable_bandwidth', True) 599 self.nvme_blocklist = target_config.get('blocklist', []) 600 self.nvme_allowlist = target_config.get('allowlist', []) 601 602 # Blocklist takes precedence, remove common elements from allowlist 603 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 604 605 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 606 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 607 608 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 609 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 610 self.set_local_nic_info(self.set_local_nic_info_helper()) 611 612 if not self.skip_spdk_install: 613 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 614 615 self.configure_system() 616 if self.enable_adq: 617 self.configure_adq() 618 self.configure_irq_affinity() 619 self.sys_config() 620 621 def set_local_nic_info_helper(self): 622 return json.loads(self.exec_cmd(["lshw", "-json"])) 623 624 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 625 stderr_opt = None 626 if stderr_redirect: 627 stderr_opt = subprocess.STDOUT 628 if change_dir: 629 old_cwd = os.getcwd() 630 os.chdir(change_dir) 631 self.log.info("Changing directory to %s" % change_dir) 632 633 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 634 635 if change_dir: 636 os.chdir(old_cwd) 637 self.log.info("Changing directory to %s" % old_cwd) 638 return out 639 640 def zip_spdk_sources(self, spdk_dir, dest_file): 641 self.log.info("Zipping SPDK source directory") 642 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 643 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 644 for file in files: 645 fh.write(os.path.relpath(os.path.join(root, file))) 646 fh.close() 647 self.log.info("Done zipping") 648 649 @staticmethod 650 def _chunks(input_list, chunks_no): 651 div, rem = divmod(len(input_list), chunks_no) 652 for i in range(chunks_no): 653 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 654 yield input_list[si:si + (div + 1 if i < rem else div)] 655 656 def spread_bdevs(self, req_disks): 657 # Spread available block devices indexes: 658 # - evenly across available initiator systems 659 # - evenly across available NIC interfaces for 660 # each initiator 661 # Not NUMA aware. 662 ip_bdev_map = [] 663 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 664 665 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 666 self.initiator_info[i]["bdev_range"] = init_chunk 667 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 668 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 669 for c in nic_chunk: 670 ip_bdev_map.append((ip, c)) 671 return ip_bdev_map 672 673 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 674 cpu_number = os.cpu_count() 675 sar_idle_sum = 0 676 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 677 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 678 679 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 680 time.sleep(ramp_time) 681 self.log.info("Starting SAR measurements") 682 683 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 684 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 685 for line in out.split("\n"): 686 if "Average" in line: 687 if "CPU" in line: 688 self.log.info("Summary CPU utilization from SAR:") 689 self.log.info(line) 690 elif "all" in line: 691 self.log.info(line) 692 else: 693 sar_idle_sum += float(line.split()[7]) 694 fh.write(out) 695 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 696 697 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 698 f.write("%0.2f" % sar_cpu_usage) 699 700 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 701 time.sleep(ramp_time) 702 self.log.info("Starting power measurements") 703 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 704 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 705 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 706 707 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 708 time.sleep(ramp_time) 709 cmd = ["pcm", "1", "-i=%s" % run_time, 710 "-csv=%s/%s" % (results_dir, pcm_file_name)] 711 out = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 712 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 713 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 714 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 715 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 716 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 717 718 if out.returncode: 719 self.log.warning("PCM Power measurement finished with a non-zero return code.") 720 self.log.warning(out.stdout.decode(encoding="utf-8")) 721 722 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 723 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 724 time.sleep(ramp_time) 725 self.log.info("INFO: starting network bandwidth measure") 726 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 727 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 728 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 729 # Improve this so that additional file containing measurements average is generated too. 730 # TODO: Monitor only these interfaces which are currently used to run the workload. 731 732 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 733 self.log.info("INFO: waiting to generate DPDK memory usage") 734 time.sleep(ramp_time) 735 self.log.info("INFO: generating DPDK memory usage") 736 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 737 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 738 739 def sys_config(self): 740 self.log.info("====Kernel release:====") 741 self.log.info(os.uname().release) 742 self.log.info("====Kernel command line:====") 743 with open('/proc/cmdline') as f: 744 cmdline = f.readlines() 745 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 746 self.log.info("====sysctl conf:====") 747 with open('/etc/sysctl.conf') as f: 748 sysctl = f.readlines() 749 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 750 self.log.info("====Cpu power info:====") 751 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 752 self.log.info("====zcopy settings:====") 753 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 754 self.log.info("====Scheduler settings:====") 755 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 756 757 758class Initiator(Server): 759 def __init__(self, name, general_config, initiator_config): 760 super().__init__(name, general_config, initiator_config) 761 762 # required fields and defaults 763 config_fields = [ 764 ConfigField(name='mode', required=True), 765 ConfigField(name='ip', required=True), 766 ConfigField(name='target_nic_ips', required=True), 767 ConfigField(name='cpus_allowed', default=None), 768 ConfigField(name='cpus_allowed_policy', default='shared'), 769 ConfigField(name='spdk_dir', default='/tmp/spdk'), 770 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 771 ConfigField(name='nvmecli_bin', default='nvme'), 772 ConfigField(name='cpu_frequency', default=None), 773 ConfigField(name='allow_cpu_sharing', default=True) 774 ] 775 776 self.read_config(config_fields, initiator_config) 777 778 if os.getenv('SPDK_WORKSPACE'): 779 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 780 781 self.subsystem_info_list = [] 782 783 self.ssh_connection = paramiko.SSHClient() 784 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 785 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 786 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 787 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 788 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 789 790 if not self.skip_spdk_install: 791 self.copy_spdk("/tmp/spdk.zip") 792 793 self.set_local_nic_info(self.set_local_nic_info_helper()) 794 self.set_cpu_frequency() 795 self.configure_system() 796 if self.enable_adq: 797 self.configure_adq() 798 self.sys_config() 799 800 def set_local_nic_info_helper(self): 801 return json.loads(self.exec_cmd(["lshw", "-json"])) 802 803 def stop(self): 804 self.restore_settings() 805 try: 806 super().stop() 807 except CpuThrottlingError as err: 808 raise err 809 finally: 810 self.ssh_connection.close() 811 812 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 813 if change_dir: 814 cmd = ["cd", change_dir, ";", *cmd] 815 816 # In case one of the command elements contains whitespace and is not 817 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 818 # when sending to remote system. 819 for i, c in enumerate(cmd): 820 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 821 cmd[i] = '"%s"' % c 822 cmd = " ".join(cmd) 823 824 # Redirect stderr to stdout thanks using get_pty option if needed 825 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 826 out = stdout.read().decode(encoding="utf-8") 827 828 # Check the return code 829 rc = stdout.channel.recv_exit_status() 830 if rc: 831 raise CalledProcessError(int(rc), cmd, out) 832 833 return out 834 835 def put_file(self, local, remote_dest): 836 ftp = self.ssh_connection.open_sftp() 837 ftp.put(local, remote_dest) 838 ftp.close() 839 840 def get_file(self, remote, local_dest): 841 ftp = self.ssh_connection.open_sftp() 842 ftp.get(remote, local_dest) 843 ftp.close() 844 845 def copy_spdk(self, local_spdk_zip): 846 self.log.info("Copying SPDK sources to initiator %s" % self.name) 847 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 848 self.log.info("Copied sources zip from target") 849 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 850 self.log.info("Sources unpacked") 851 852 def copy_result_files(self, dest_dir): 853 self.log.info("Copying results") 854 855 if not os.path.exists(dest_dir): 856 os.mkdir(dest_dir) 857 858 # Get list of result files from initiator and copy them back to target 859 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 860 861 for file in file_list: 862 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 863 os.path.join(dest_dir, file)) 864 self.log.info("Done copying results") 865 866 def match_subsystems(self, target_subsystems): 867 subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips] 868 if not subsystems: 869 raise Exception("No matching subsystems found on target side!") 870 subsystems.sort(key=lambda x: x[1]) 871 self.log.info("Found matching subsystems on target side:") 872 for s in subsystems: 873 self.log.info(s) 874 self.subsystem_info_list = subsystems 875 876 @abstractmethod 877 def init_connect(self): 878 pass 879 880 @abstractmethod 881 def init_disconnect(self): 882 pass 883 884 @abstractmethod 885 def gen_fio_filename_conf(self, *args, **kwargs): 886 # Logic implemented in SPDKInitiator and KernelInitiator classes 887 pass 888 889 def get_route_nic_numa(self, remote_nvme_ip): 890 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 891 local_nvme_nic = local_nvme_nic[0]["dev"] 892 return self.get_nic_numa_node(local_nvme_nic) 893 894 @staticmethod 895 def gen_fio_offset_section(offset_inc, num_jobs): 896 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 897 return "\n".join(["size=%s%%" % offset_inc, 898 "offset=0%", 899 "offset_increment=%s%%" % offset_inc]) 900 901 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 902 numa_stats = {} 903 allowed_cpus = [] 904 for nvme in fio_filenames_list: 905 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 906 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 907 908 # Use the most common NUMA node for this chunk to allocate memory and CPUs 909 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 910 911 # Check if we have enough free CPUs to pop from the list before assigning them 912 if len(self.available_cpus[section_local_numa]) < num_jobs: 913 if self.allow_cpu_sharing: 914 self.log.info("Regenerating available CPU list %s" % section_local_numa) 915 # Remove still available CPUs from the regenerated list. We don't want to 916 # regenerate it with duplicates. 917 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 918 self.available_cpus[section_local_numa].extend(cpus_regen) 919 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 920 else: 921 self.log.error("No more free CPU cores to use from allowed_cpus list!") 922 raise IndexError 923 924 for _ in range(num_jobs): 925 try: 926 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 927 except IndexError: 928 self.log.error("No more free CPU cores to use from allowed_cpus list!") 929 raise 930 931 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 932 "numa_mem_policy=prefer:%s" % section_local_numa]) 933 934 def gen_fio_config(self, rw, block_size, io_depth, subsys_no, fio_settings): 935 fio_conf_template = inspect.cleandoc(""" 936 [global] 937 ioengine={ioengine} 938 {spdk_conf} 939 thread=1 940 group_reporting=1 941 direct=1 942 percentile_list=50:90:99:99.5:99.9:99.99:99.999 943 944 norandommap=1 945 rw={rw} 946 rwmixread={rwmixread} 947 bs={block_size} 948 time_based=1 949 ramp_time={ramp_time} 950 runtime={run_time} 951 rate_iops={rate_iops} 952 """) 953 954 if self.cpus_allowed is not None: 955 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 956 self.num_cores = len(self.get_core_list_from_mask(self.cpus_allowed)) 957 threads = range(0, self.num_cores) 958 elif hasattr(self, 'num_cores'): 959 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 960 threads = range(0, int(self.num_cores)) 961 else: 962 self.num_cores = len(self.subsystem_info_list) 963 threads = range(0, len(self.subsystem_info_list)) 964 965 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, fio_settings["num_jobs"], 966 fio_settings["offset"], fio_settings["offset_inc"], fio_settings["numa_align"]) 967 968 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 969 rw=rw, rwmixread=fio_settings["rw_mix_read"], block_size=block_size, 970 ramp_time=fio_settings["ramp_time"], run_time=fio_settings["run_time"], 971 rate_iops=fio_settings["rate_iops"]) 972 973 # TODO: hipri disabled for now, as it causes fio errors: 974 # io_u error on file /dev/nvme2n1: Operation not supported 975 # See comment in KernelInitiator class, init_connect() function 976 if "io_uring" in self.ioengine: 977 fio_config = "\n".join([fio_config, "fixedbufs=1", "registerfiles=1", "#hipri=1"]) 978 if fio_settings["num_jobs"]: 979 fio_config = "\n".join([fio_config, f"numjobs={fio_settings['num_jobs']}"]) 980 if self.cpus_allowed is not None: 981 fio_config = "\n".join([fio_config, f"cpus_allowed={self.cpus_allowed}", f"cpus_allowed_policy={self.cpus_allowed_policy}"]) 982 fio_config = "\n".join([fio_config, filename_section]) 983 984 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, fio_settings["rw_mix_read"]) 985 if hasattr(self, "num_cores"): 986 fio_config_filename += "_%sCPU" % self.num_cores 987 fio_config_filename += ".fio" 988 989 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 990 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 991 self.log.info("Created FIO Config:") 992 self.log.info(fio_config) 993 994 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 995 996 def set_cpu_frequency(self): 997 if self.cpu_frequency is not None: 998 try: 999 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 1000 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 1001 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 1002 except Exception: 1003 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 1004 sys.exit(1) 1005 else: 1006 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 1007 1008 def run_fio(self, fio_config_file, run_num=1): 1009 job_name, _ = os.path.splitext(fio_config_file) 1010 self.log.info("Starting FIO run for job: %s" % job_name) 1011 self.log.info("Using FIO: %s" % self.fio_bin) 1012 1013 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 1014 try: 1015 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 1016 "--output=%s" % output_filename, "--eta=never"], True) 1017 self.log.info(output) 1018 self.log.info("FIO run finished. Results in: %s" % output_filename) 1019 except subprocess.CalledProcessError as e: 1020 self.log.error("ERROR: Fio process failed!") 1021 self.log.error(e.stdout) 1022 1023 def sys_config(self): 1024 self.log.info("====Kernel release:====") 1025 self.log.info(self.exec_cmd(["uname", "-r"])) 1026 self.log.info("====Kernel command line:====") 1027 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1028 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1029 self.log.info("====sysctl conf:====") 1030 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 1031 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1032 self.log.info("====Cpu power info:====") 1033 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 1034 1035 1036class KernelTarget(Target): 1037 def __init__(self, name, general_config, target_config): 1038 super().__init__(name, general_config, target_config) 1039 # Defaults 1040 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1041 1042 def load_drivers(self): 1043 self.log.info("Loading drivers") 1044 super().load_drivers() 1045 if self.null_block: 1046 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1047 1048 def configure_adq(self): 1049 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1050 1051 def adq_configure_tc(self): 1052 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1053 1054 def adq_set_busy_read(self, busy_read_val): 1055 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1056 return {"net.core.busy_read": 0} 1057 1058 def stop(self): 1059 self.nvmet_command(self.nvmet_bin, "clear") 1060 self.restore_settings() 1061 super().stop() 1062 1063 def get_nvme_device_bdf(self, nvme_dev_path): 1064 nvme_name = os.path.basename(nvme_dev_path) 1065 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1066 1067 def get_nvme_devices(self): 1068 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1069 nvme_list = [] 1070 for dev in dev_list: 1071 if "nvme" not in dev: 1072 continue 1073 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1074 continue 1075 if len(self.nvme_allowlist) == 0: 1076 nvme_list.append(dev) 1077 continue 1078 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1079 nvme_list.append(dev) 1080 return nvme_list 1081 1082 def nvmet_command(self, nvmet_bin, command): 1083 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1084 1085 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1086 1087 nvmet_cfg = { 1088 "ports": [], 1089 "hosts": [], 1090 "subsystems": [], 1091 } 1092 1093 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1094 port = str(4420 + bdev_num) 1095 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1096 serial = "SPDK00%s" % bdev_num 1097 bdev_name = nvme_list[bdev_num] 1098 1099 nvmet_cfg["subsystems"].append({ 1100 "allowed_hosts": [], 1101 "attr": { 1102 "allow_any_host": "1", 1103 "serial": serial, 1104 "version": "1.3" 1105 }, 1106 "namespaces": [ 1107 { 1108 "device": { 1109 "path": bdev_name, 1110 "uuid": "%s" % uuid.uuid4() 1111 }, 1112 "enable": 1, 1113 "nsid": port 1114 } 1115 ], 1116 "nqn": nqn 1117 }) 1118 1119 nvmet_cfg["ports"].append({ 1120 "addr": { 1121 "adrfam": "ipv4", 1122 "traddr": ip, 1123 "trsvcid": port, 1124 "trtype": self.transport 1125 }, 1126 "portid": bdev_num, 1127 "referrals": [], 1128 "subsystems": [nqn] 1129 }) 1130 1131 self.subsystem_info_list.append((port, nqn, ip)) 1132 self.subsys_no = len(self.subsystem_info_list) 1133 1134 with open("kernel.conf", "w") as fh: 1135 fh.write(json.dumps(nvmet_cfg, indent=2)) 1136 1137 def tgt_start(self): 1138 self.log.info("Configuring kernel NVMeOF Target") 1139 1140 if self.null_block: 1141 self.log.info("Configuring with null block device.") 1142 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1143 else: 1144 self.log.info("Configuring with NVMe drives.") 1145 nvme_list = self.get_nvme_devices() 1146 1147 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1148 self.subsys_no = len(nvme_list) 1149 1150 self.nvmet_command(self.nvmet_bin, "clear") 1151 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1152 1153 if self.enable_adq: 1154 self.adq_configure_tc() 1155 1156 self.log.info("Done configuring kernel NVMeOF Target") 1157 1158 1159class SPDKTarget(Target): 1160 def __init__(self, name, general_config, target_config): 1161 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1162 # Method setting IRQ affinity is run as part of parent classes init, 1163 # so we need to have self.core_mask set before changing IRQ affinity. 1164 self.core_mask = target_config["core_mask"] 1165 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1166 1167 super().__init__(name, general_config, target_config) 1168 1169 # Format: property, default value 1170 config_fields = [ 1171 ConfigField(name='dif_insert_strip', default=False), 1172 ConfigField(name='null_block_dif_type', default=0), 1173 ConfigField(name='num_shared_buffers', default=4096), 1174 ConfigField(name='max_queue_depth', default=128), 1175 ConfigField(name='bpf_scripts', default=[]), 1176 ConfigField(name='scheduler_core_limit', default=None), 1177 ConfigField(name='dsa_settings', default=False), 1178 ConfigField(name='iobuf_small_pool_count', default=32767), 1179 ConfigField(name='iobuf_large_pool_count', default=16383), 1180 ConfigField(name='num_cqe', default=4096), 1181 ConfigField(name='sock_impl', default='posix') 1182 ] 1183 1184 self.read_config(config_fields, target_config) 1185 1186 self.bpf_proc = None 1187 self.enable_dsa = False 1188 1189 self.log.info("====DSA settings:====") 1190 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1191 1192 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1193 if mode not in ["default", "bynode", "cpulist", 1194 "shared", "split", "split-bynode"]: 1195 self.log.error("%s irq affinity setting not supported" % mode) 1196 raise Exception 1197 1198 # Create core list from SPDK's mask and change it to string. 1199 # This is the type configure_irq_affinity expects for cpulist parameter. 1200 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1201 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1202 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1203 1204 if mode == "shared": 1205 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1206 elif mode == "split": 1207 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1208 elif mode == "split-bynode": 1209 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1210 else: 1211 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1212 1213 def adq_set_busy_read(self, busy_read_val): 1214 return {"net.core.busy_read": busy_read_val} 1215 1216 def get_nvme_devices_count(self): 1217 return len(self.get_nvme_devices()) 1218 1219 def get_nvme_devices(self): 1220 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1221 bdev_bdfs = [] 1222 for bdev in bdev_subsys_json_obj["config"]: 1223 bdev_traddr = bdev["params"]["traddr"] 1224 if bdev_traddr in self.nvme_blocklist: 1225 continue 1226 if len(self.nvme_allowlist) == 0: 1227 bdev_bdfs.append(bdev_traddr) 1228 if bdev_traddr in self.nvme_allowlist: 1229 bdev_bdfs.append(bdev_traddr) 1230 return bdev_bdfs 1231 1232 def spdk_tgt_configure(self): 1233 self.log.info("Configuring SPDK NVMeOF target via RPC") 1234 1235 # Create transport layer 1236 nvmf_transport_params = { 1237 "client": self.client, 1238 "trtype": self.transport, 1239 "num_shared_buffers": self.num_shared_buffers, 1240 "max_queue_depth": self.max_queue_depth, 1241 "dif_insert_or_strip": self.dif_insert_strip, 1242 "sock_priority": self.adq_priority, 1243 "num_cqe": self.num_cqe 1244 } 1245 1246 if self.enable_adq: 1247 nvmf_transport_params["acceptor_poll_rate"] = 10000 1248 1249 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1250 self.log.info("SPDK NVMeOF transport layer:") 1251 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1252 1253 if self.null_block: 1254 self.spdk_tgt_add_nullblock(self.null_block) 1255 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1256 else: 1257 self.spdk_tgt_add_nvme_conf() 1258 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1259 1260 if self.enable_adq: 1261 self.adq_configure_tc() 1262 1263 self.log.info("Done configuring SPDK NVMeOF Target") 1264 1265 def spdk_tgt_add_nullblock(self, null_block_count): 1266 md_size = 0 1267 block_size = 4096 1268 if self.null_block_dif_type != 0: 1269 md_size = 128 1270 1271 self.log.info("Adding null block bdevices to config via RPC") 1272 for i in range(null_block_count): 1273 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1274 rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i), 1275 dif_type=self.null_block_dif_type, md_size=md_size) 1276 self.log.info("SPDK Bdevs configuration:") 1277 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1278 1279 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1280 self.log.info("Adding NVMe bdevs to config via RPC") 1281 1282 bdfs = self.get_nvme_devices() 1283 bdfs = [b.replace(":", ".") for b in bdfs] 1284 1285 if req_num_disks: 1286 if req_num_disks > len(bdfs): 1287 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1288 sys.exit(1) 1289 else: 1290 bdfs = bdfs[0:req_num_disks] 1291 1292 for i, bdf in enumerate(bdfs): 1293 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1294 1295 self.log.info("SPDK Bdevs configuration:") 1296 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1297 1298 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1299 self.log.info("Adding subsystems to config") 1300 if not req_num_disks: 1301 req_num_disks = self.get_nvme_devices_count() 1302 1303 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1304 port = str(4420 + bdev_num) 1305 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1306 serial = "SPDK00%s" % bdev_num 1307 bdev_name = "Nvme%sn1" % bdev_num 1308 1309 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1310 allow_any_host=True, max_namespaces=8) 1311 rpc.nvmf.nvmf_subsystem_add_ns(client=self.client, nqn=nqn, bdev_name=bdev_name) 1312 for nqn_name in [nqn, "discovery"]: 1313 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1314 nqn=nqn_name, 1315 trtype=self.transport, 1316 traddr=ip, 1317 trsvcid=port, 1318 adrfam="ipv4") 1319 self.subsystem_info_list.append((port, nqn, ip)) 1320 self.subsys_no = len(self.subsystem_info_list) 1321 1322 self.log.info("SPDK NVMeOF subsystem configuration:") 1323 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1324 1325 def bpf_start(self): 1326 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1327 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1328 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1329 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1330 1331 with open(self.pid, "r") as fh: 1332 nvmf_pid = str(fh.readline()) 1333 1334 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1335 self.log.info(cmd) 1336 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1337 1338 def tgt_start(self): 1339 if self.null_block: 1340 self.subsys_no = 1 1341 else: 1342 self.subsys_no = self.get_nvme_devices_count() 1343 self.log.info("Starting SPDK NVMeOF Target process") 1344 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1345 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1346 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1347 1348 with open(self.pid, "w") as fh: 1349 fh.write(str(proc.pid)) 1350 self.nvmf_proc = proc 1351 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1352 self.log.info("Waiting for spdk to initialize...") 1353 while True: 1354 if os.path.exists("/var/tmp/spdk.sock"): 1355 break 1356 time.sleep(1) 1357 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1358 1359 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1360 rpc.iobuf.iobuf_set_options(self.client, 1361 small_pool_count=self.iobuf_small_pool_count, 1362 large_pool_count=self.iobuf_large_pool_count, 1363 small_bufsize=None, 1364 large_bufsize=None) 1365 1366 if self.enable_zcopy: 1367 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1368 enable_zerocopy_send_server=True) 1369 self.log.info("Target socket options:") 1370 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1371 1372 if self.enable_adq: 1373 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1374 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1375 nvme_adminq_poll_period_us=100000, retry_count=4) 1376 1377 if self.enable_dsa: 1378 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1379 self.log.info("Target DSA accel module enabled") 1380 1381 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1382 rpc.framework_start_init(self.client) 1383 1384 if self.bpf_scripts: 1385 self.bpf_start() 1386 1387 self.spdk_tgt_configure() 1388 1389 def stop(self): 1390 if self.bpf_proc: 1391 self.log.info("Stopping BPF Trace script") 1392 self.bpf_proc.terminate() 1393 self.bpf_proc.wait() 1394 1395 if hasattr(self, "nvmf_proc"): 1396 try: 1397 self.nvmf_proc.terminate() 1398 self.nvmf_proc.wait(timeout=30) 1399 except Exception as e: 1400 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1401 self.log.info(e) 1402 self.nvmf_proc.kill() 1403 self.nvmf_proc.communicate() 1404 # Try to clean up RPC socket files if they were not removed 1405 # because of using 'kill' 1406 try: 1407 os.remove("/var/tmp/spdk.sock") 1408 os.remove("/var/tmp/spdk.sock.lock") 1409 except FileNotFoundError: 1410 pass 1411 self.restore_settings() 1412 super().stop() 1413 1414 1415class KernelInitiator(Initiator): 1416 def __init__(self, name, general_config, initiator_config): 1417 super().__init__(name, general_config, initiator_config) 1418 1419 # Defaults 1420 self.extra_params = initiator_config.get('extra_params', '') 1421 1422 self.ioengine = "libaio" 1423 self.spdk_conf = "" 1424 1425 if "num_cores" in initiator_config: 1426 self.num_cores = initiator_config["num_cores"] 1427 1428 if "kernel_engine" in initiator_config: 1429 self.ioengine = initiator_config["kernel_engine"] 1430 if "io_uring" in self.ioengine: 1431 self.extra_params += ' --nr-poll-queues=8' 1432 1433 def configure_adq(self): 1434 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1435 1436 def adq_configure_tc(self): 1437 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1438 1439 def adq_set_busy_read(self, busy_read_val): 1440 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1441 return {"net.core.busy_read": 0} 1442 1443 def get_connected_nvme_list(self): 1444 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1445 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1446 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1447 return nvme_list 1448 1449 def init_connect(self): 1450 self.log.info("Below connection attempts may result in error messages, this is expected!") 1451 for subsystem in self.subsystem_info_list: 1452 self.log.info("Trying to connect %s %s %s" % subsystem) 1453 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1454 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1455 time.sleep(2) 1456 1457 if "io_uring" in self.ioengine: 1458 self.log.info("Setting block layer settings for io_uring.") 1459 1460 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1461 # apparently it's not possible for connected subsystems. 1462 # Results in "error: Invalid argument" 1463 block_sysfs_settings = { 1464 "iostats": "0", 1465 "rq_affinity": "0", 1466 "nomerges": "2" 1467 } 1468 1469 for disk in self.get_connected_nvme_list(): 1470 sysfs = os.path.join("/sys/block", disk, "queue") 1471 for k, v in block_sysfs_settings.items(): 1472 sysfs_opt_path = os.path.join(sysfs, k) 1473 try: 1474 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1475 except CalledProcessError as e: 1476 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1477 finally: 1478 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1479 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1480 1481 def init_disconnect(self): 1482 for subsystem in self.subsystem_info_list: 1483 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1484 time.sleep(1) 1485 1486 def get_nvme_subsystem_numa(self, dev_name): 1487 # Remove two last characters to get controller name instead of subsystem name 1488 nvme_ctrl = os.path.basename(dev_name)[:-2] 1489 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1490 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1491 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1492 1493 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True): 1494 self.available_cpus = self.get_numa_cpu_map() 1495 if len(threads) >= len(subsystems): 1496 threads = range(0, len(subsystems)) 1497 1498 # Generate connected nvme devices names and sort them by used NIC numa node 1499 # to allow better grouping when splitting into fio sections. 1500 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1501 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1502 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1503 1504 filename_section = "" 1505 nvme_per_split = int(len(nvme_list) / len(threads)) 1506 remainder = len(nvme_list) % len(threads) 1507 iterator = iter(nvme_list) 1508 result = [] 1509 for i in range(len(threads)): 1510 result.append([]) 1511 for _ in range(nvme_per_split): 1512 result[i].append(next(iterator)) 1513 if remainder: 1514 result[i].append(next(iterator)) 1515 remainder -= 1 1516 for i, r in enumerate(result): 1517 header = "[filename%s]" % i 1518 disks = "\n".join(["filename=%s" % x for x in r]) 1519 job_section_qd = round((io_depth * len(r)) / num_jobs) 1520 if job_section_qd == 0: 1521 job_section_qd = 1 1522 iodepth = "iodepth=%s" % job_section_qd 1523 1524 offset_section = "" 1525 if offset: 1526 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1527 1528 numa_opts = "" 1529 if numa_align: 1530 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1531 1532 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1533 1534 return filename_section 1535 1536 1537class SPDKInitiator(Initiator): 1538 def __init__(self, name, general_config, initiator_config): 1539 super().__init__(name, general_config, initiator_config) 1540 1541 if not self.skip_spdk_install: 1542 self.install_spdk() 1543 1544 # Optional fields 1545 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1546 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1547 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1548 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1549 1550 if "num_cores" in initiator_config: 1551 self.num_cores = initiator_config["num_cores"] 1552 1553 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1554 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1555 1556 def adq_set_busy_read(self, busy_read_val): 1557 return {"net.core.busy_read": busy_read_val} 1558 1559 def install_spdk(self): 1560 self.log.info("Using fio binary %s" % self.fio_bin) 1561 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1562 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1563 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1564 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1565 "--enable-lto", "--disable-unit-tests"]) 1566 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1567 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1568 1569 self.log.info("SPDK built") 1570 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1571 1572 def init_connect(self): 1573 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1574 # bdev plugin initiates connection just before starting IO traffic. 1575 # This is just to have a "init_connect" equivalent of the same function 1576 # from KernelInitiator class. 1577 # Just prepare bdev.conf JSON file for later use and consider it 1578 # "making a connection". 1579 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1580 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1581 1582 def init_disconnect(self): 1583 # SPDK Initiator does not need to explicitly disconnect as this gets done 1584 # after fio bdev plugin finishes IO. 1585 return 1586 1587 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1588 spdk_cfg_section = { 1589 "subsystems": [ 1590 { 1591 "subsystem": "bdev", 1592 "config": [] 1593 }, 1594 { 1595 "subsystem": "iobuf", 1596 "config": [ 1597 { 1598 "method": "iobuf_set_options", 1599 "params": { 1600 "small_pool_count": self.small_pool_count, 1601 "large_pool_count": self.large_pool_count 1602 } 1603 } 1604 ] 1605 }, 1606 { 1607 "subsystem": "sock", 1608 "config": [ 1609 { 1610 "method": "sock_set_default_impl", 1611 "params": { 1612 "impl_name": self.sock_impl 1613 } 1614 } 1615 ] 1616 } 1617 ] 1618 } 1619 1620 for i, subsys in enumerate(remote_subsystem_list): 1621 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1622 nvme_ctrl = { 1623 "method": "bdev_nvme_attach_controller", 1624 "params": { 1625 "name": "Nvme{}".format(i), 1626 "trtype": self.transport, 1627 "traddr": sub_addr, 1628 "trsvcid": sub_port, 1629 "subnqn": sub_nqn, 1630 "adrfam": "IPv4" 1631 } 1632 } 1633 1634 if self.enable_adq: 1635 nvme_ctrl["params"].update({"priority": "1"}) 1636 1637 if self.enable_data_digest: 1638 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1639 1640 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1641 1642 return json.dumps(spdk_cfg_section, indent=2) 1643 1644 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True): 1645 self.available_cpus = self.get_numa_cpu_map() 1646 filename_section = "" 1647 if len(threads) >= len(subsystems): 1648 threads = range(0, len(subsystems)) 1649 1650 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1651 # to allow better grouping when splitting into fio sections. 1652 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1653 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1654 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1655 1656 nvme_per_split = int(len(subsystems) / len(threads)) 1657 remainder = len(subsystems) % len(threads) 1658 iterator = iter(filenames) 1659 result = [] 1660 for i in range(len(threads)): 1661 result.append([]) 1662 for _ in range(nvme_per_split): 1663 result[i].append(next(iterator)) 1664 if remainder: 1665 result[i].append(next(iterator)) 1666 remainder -= 1 1667 for i, r in enumerate(result): 1668 header = "[filename%s]" % i 1669 disks = "\n".join(["filename=%s" % x for x in r]) 1670 job_section_qd = round((io_depth * len(r)) / num_jobs) 1671 if job_section_qd == 0: 1672 job_section_qd = 1 1673 iodepth = "iodepth=%s" % job_section_qd 1674 1675 offset_section = "" 1676 if offset: 1677 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1678 1679 numa_opts = "" 1680 if numa_align: 1681 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1682 1683 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1684 1685 return filename_section 1686 1687 def get_nvme_subsystem_numa(self, bdev_name): 1688 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1689 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1690 1691 # Remove two last characters to get controller name instead of subsystem name 1692 nvme_ctrl = bdev_name[:-2] 1693 for bdev in bdev_conf_json_obj: 1694 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1695 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1696 return None 1697 1698 1699def initiators_match_subsystems(initiators, target_obj): 1700 for i in initiators: 1701 i.match_subsystems(target_obj.subsystem_info_list) 1702 if i.enable_adq: 1703 i.adq_configure_tc() 1704 1705 1706def run_single_fio_test(args, initiators, configs, target_obj, block_size, io_depth, rw, fio_settings): 1707 1708 for run_no in range(1, fio_settings["run_num"]+1): 1709 threads = [] 1710 power_daemon = None 1711 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_settings["rw_mix_read"], run_no) 1712 1713 for i, cfg in zip(initiators, configs): 1714 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1715 threads.append(t) 1716 if target_obj.enable_sar: 1717 sar_file_prefix = measurements_prefix + "_sar" 1718 t = threading.Thread(target=target_obj.measure_sar, 1719 args=(args.results, sar_file_prefix, fio_settings["ramp_time"], fio_settings["run_time"])) 1720 threads.append(t) 1721 1722 if target_obj.enable_pcm: 1723 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1724 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1725 args=(args.results, pcm_fnames[0], fio_settings["ramp_time"], fio_settings["run_time"])) 1726 threads.append(pcm_cpu_t) 1727 1728 if target_obj.enable_bw: 1729 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1730 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1731 args=(args.results, bandwidth_file_name, fio_settings["ramp_time"], fio_settings["run_time"])) 1732 threads.append(t) 1733 1734 if target_obj.enable_dpdk_memory: 1735 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1736 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_settings["ramp_time"])) 1737 threads.append(t) 1738 1739 if target_obj.enable_pm: 1740 power_daemon = threading.Thread(target=target_obj.measure_power, 1741 args=(args.results, measurements_prefix, script_full_dir, 1742 fio_settings["ramp_time"], fio_settings["run_time"])) 1743 threads.append(power_daemon) 1744 1745 for t in threads: 1746 t.start() 1747 for t in threads: 1748 t.join() 1749 1750 1751def run_fio_tests(args, initiators, target_obj, fio_settings): 1752 1753 for block_size, io_depth, rw in fio_settings["workloads"]: 1754 configs = [] 1755 for i in initiators: 1756 i.init_connect() 1757 cfg = i.gen_fio_config(rw, block_size, io_depth, target_obj.subsys_no, fio_settings) 1758 configs.append(cfg) 1759 1760 run_single_fio_test(args, initiators, configs, target_obj, block_size, io_depth, rw, fio_settings) 1761 1762 for i in initiators: 1763 i.init_disconnect() 1764 i.copy_result_files(args.results) 1765 1766 try: 1767 parse_results(args.results, args.csv_filename) 1768 except Exception as err: 1769 logging.error("There was an error with parsing the results") 1770 raise err 1771 1772 1773def validate_allowlist_settings(data, is_forced_used): 1774 if data["target"].get("null_block_devices"): 1775 logging.info("Null block devices used for testing. Allow and block lists will not be checked.") 1776 return True 1777 1778 if is_forced_used: 1779 logging.warning("""Force mode used. All available NVMe drives on your system will be used, 1780 which may potentially lead to data loss""") 1781 return True 1782 1783 if not (data["target"].get("allowlist") or data["target"].get("blocklist")): 1784 logging.error("""This script requires allowlist or blocklist to be defined. 1785 You can choose to use all available NVMe drives on your system (-f option), 1786 which may potentially lead to data loss.""") 1787 return False 1788 1789 return True 1790 1791 1792class CpuThrottlingError(Exception): 1793 def __init__(self, message): 1794 super().__init__(message) 1795 1796 1797if __name__ == "__main__": 1798 exit_code = 0 1799 1800 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1801 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1802 1803 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1804 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1805 help='Configuration file.') 1806 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1807 help='Results directory.') 1808 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1809 help='CSV results filename.') 1810 parser.add_argument('-f', '--force', default=False, action='store_true', 1811 dest='force', help="""Force script to continue and try to use all 1812 available NVMe devices during test. 1813 WARNING: Might result in data loss on used NVMe drives""") 1814 1815 args = parser.parse_args() 1816 1817 logging.basicConfig(level=logging.INFO, 1818 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1819 1820 logging.info("Using config file: %s" % args.config) 1821 with open(args.config, "r") as config: 1822 data = json.load(config) 1823 1824 initiators = [] 1825 general_config = data["general"] 1826 target_config = data["target"] 1827 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1828 1829 if not validate_allowlist_settings(data, args.force): 1830 exit(1) 1831 1832 for k, v in data.items(): 1833 if "target" in k: 1834 v.update({"results_dir": args.results}) 1835 if data[k]["mode"] == "spdk": 1836 target_obj = SPDKTarget(k, data["general"], v) 1837 elif data[k]["mode"] == "kernel": 1838 target_obj = KernelTarget(k, data["general"], v) 1839 elif "initiator" in k: 1840 if data[k]["mode"] == "spdk": 1841 init_obj = SPDKInitiator(k, data["general"], v) 1842 elif data[k]["mode"] == "kernel": 1843 init_obj = KernelInitiator(k, data["general"], v) 1844 initiators.append(init_obj) 1845 elif "fio" in k: 1846 fio_settings = { 1847 "workloads": itertools.product(data[k]["bs"], data[k]["qd"], data[k]["rw"]), 1848 "run_time": data[k].get("run_time", 10), 1849 "ramp_time": data[k].get("ramp_time", 0), 1850 "rw_mix_read": data[k].get("rwmixread", 70), 1851 "run_num": data[k].get("run_num", None), 1852 "num_jobs": data[k].get("num_jobs", None), 1853 "rate_iops": data[k].get("rate_iops", 0), 1854 "offset": data[k].get("offset", False), 1855 "offset_inc": data[k].get("offset_inc", 0), 1856 "numa_align": data[k].get("numa_align", 1) 1857 } 1858 else: 1859 continue 1860 1861 try: 1862 os.mkdir(args.results) 1863 except FileExistsError: 1864 pass 1865 1866 for i in initiators: 1867 target_obj.initiator_info.append( 1868 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1869 ) 1870 1871 try: 1872 target_obj.tgt_start() 1873 initiators_match_subsystems(initiators, target_obj) 1874 1875 # Poor mans threading 1876 # Run FIO tests 1877 run_fio_tests(args, initiators, target_obj, fio_settings) 1878 1879 except Exception as e: 1880 logging.error("Exception occurred while running FIO tests") 1881 logging.error(e) 1882 exit_code = 1 1883 1884 finally: 1885 for i in [*initiators, target_obj]: 1886 try: 1887 i.stop() 1888 except CpuThrottlingError as err: 1889 logging.error(err) 1890 exit_code = 1 1891 except Exception as err: 1892 pass 1893 sys.exit(exit_code) 1894