1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False), 57 ConfigField(name='pause_frames', default=None) 58 ] 59 60 self.read_config(config_fields, general_config) 61 self.transport = self.transport.lower() 62 63 config_fields = [ 64 ConfigField(name='nic_ips', required=True), 65 ConfigField(name='mode', required=True), 66 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 67 ConfigField(name='enable_arfs', default=False), 68 ConfigField(name='tuned_profile', default='') 69 ] 70 self.read_config(config_fields, server_config) 71 72 self.local_nic_info = [] 73 self._nics_json_obj = {} 74 self.svc_restore_dict = {} 75 self.sysctl_restore_dict = {} 76 self.tuned_restore_dict = {} 77 self.governor_restore = "" 78 79 self.enable_adq = server_config.get('adq_enable', False) 80 self.adq_priority = 1 if self.enable_adq else None 81 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 82 83 if not re.match(r'^[A-Za-z0-9\-]+$', name): 84 self.log.info("Please use a name which contains only letters, numbers or dashes") 85 sys.exit(1) 86 87 def read_config(self, config_fields, config): 88 for config_field in config_fields: 89 value = config.get(config_field.name, config_field.default) 90 if config_field.required is True and value is None: 91 raise ValueError('%s config key is required' % config_field.name) 92 93 setattr(self, config_field.name, value) 94 95 @staticmethod 96 def get_uncommented_lines(lines): 97 return [line for line in lines if line and not line.startswith('#')] 98 99 def get_nic_name_by_ip(self, ip): 100 if not self._nics_json_obj: 101 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 102 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 103 for nic in self._nics_json_obj: 104 for addr in nic["addr_info"]: 105 if ip in addr["local"]: 106 return nic["ifname"] 107 108 @abstractmethod 109 def set_local_nic_info_helper(self): 110 pass 111 112 def set_local_nic_info(self, pci_info): 113 def extract_network_elements(json_obj): 114 nic_list = [] 115 if isinstance(json_obj, list): 116 for x in json_obj: 117 nic_list.extend(extract_network_elements(x)) 118 elif isinstance(json_obj, dict): 119 if "children" in json_obj: 120 nic_list.extend(extract_network_elements(json_obj["children"])) 121 if "class" in json_obj.keys() and "network" in json_obj["class"]: 122 nic_list.append(json_obj) 123 return nic_list 124 125 self.local_nic_info = extract_network_elements(pci_info) 126 127 def get_nic_numa_node(self, nic_name): 128 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 129 130 def get_numa_cpu_map(self): 131 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 132 numa_cpu_json_map = {} 133 134 for cpu in numa_cpu_json_obj["cpus"]: 135 cpu_num = int(cpu["cpu"]) 136 numa_node = int(cpu["node"]) 137 numa_cpu_json_map.setdefault(numa_node, []) 138 numa_cpu_json_map[numa_node].append(cpu_num) 139 140 return numa_cpu_json_map 141 142 # pylint: disable=R0201 143 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 144 return "" 145 146 def configure_system(self): 147 self.load_drivers() 148 self.configure_services() 149 self.configure_sysctl() 150 self.configure_arfs() 151 self.configure_tuned() 152 self.configure_cpu_governor() 153 self.configure_irq_affinity(**self.irq_settings) 154 self.configure_pause_frames() 155 156 RDMA_PROTOCOL_IWARP = 0 157 RDMA_PROTOCOL_ROCE = 1 158 RDMA_PROTOCOL_UNKNOWN = -1 159 160 def check_rdma_protocol(self): 161 try: 162 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 163 roce_ena = roce_ena.strip() 164 if roce_ena == "0": 165 return self.RDMA_PROTOCOL_IWARP 166 else: 167 return self.RDMA_PROTOCOL_ROCE 168 except CalledProcessError as e: 169 self.log.error("ERROR: failed to check RDMA protocol!") 170 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 171 return self.RDMA_PROTOCOL_UNKNOWN 172 173 def load_drivers(self): 174 self.log.info("Loading drivers") 175 self.exec_cmd(["sudo", "modprobe", "-a", 176 "nvme-%s" % self.transport, 177 "nvmet-%s" % self.transport]) 178 current_mode = self.check_rdma_protocol() 179 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 180 self.log.error("ERROR: failed to check RDMA protocol mode") 181 return 182 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 183 self.reload_driver("irdma", "roce_ena=1") 184 self.log.info("Loaded irdma driver with RoCE enabled") 185 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 186 self.log.info("Leaving irdma driver with RoCE enabled") 187 else: 188 self.reload_driver("irdma", "roce_ena=0") 189 self.log.info("Loaded irdma driver with iWARP enabled") 190 191 def set_pause_frames(self, rx_state, tx_state): 192 for nic_ip in self.nic_ips: 193 nic_name = self.get_nic_name_by_ip(nic_ip) 194 self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state]) 195 196 def configure_pause_frames(self): 197 if self.pause_frames is None: 198 self.log.info("Keeping NIC's default pause frames setting") 199 return 200 elif not self.pause_frames: 201 self.log.info("Turning off pause frames") 202 self.set_pause_frames("off", "off") 203 else: 204 self.log.info("Turning on pause frames") 205 self.set_pause_frames("on", "on") 206 207 def configure_arfs(self): 208 rps_flow_cnt = 512 209 if not self.enable_arfs: 210 rps_flow_cnt = 0 211 212 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 213 for nic_name in nic_names: 214 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 215 self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}") 216 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 217 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 218 219 for qf in queue_files: 220 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 221 set_value = self.exec_cmd(["cat", f"/sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]).strip() 222 self.log.info(f"Confirmed rps_flow_cnt set to {set_value} in /sys/class/net/{nic_name}/queues/{qf}") 223 224 self.log.info(f"Configuration of {nic_name} completed with confirmed rps_flow_cnt settings.") 225 226 self.log.info("ARFS configuration completed.") 227 228 def configure_adq(self): 229 self.adq_load_modules() 230 self.adq_configure_nic() 231 232 def adq_load_modules(self): 233 self.log.info("Modprobing ADQ-related Linux modules...") 234 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 235 for module in adq_module_deps: 236 try: 237 self.exec_cmd(["sudo", "modprobe", module]) 238 self.log.info("%s loaded!" % module) 239 except CalledProcessError as e: 240 self.log.error("ERROR: failed to load module %s" % module) 241 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 242 243 def adq_configure_tc(self): 244 self.log.info("Configuring ADQ Traffic classes and filters...") 245 246 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 247 num_queues_tc1 = self.num_cores 248 port_param = "dst_port" if isinstance(self, Target) else "src_port" 249 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 250 251 for nic_ip in self.nic_ips: 252 nic_name = self.get_nic_name_by_ip(nic_ip) 253 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 254 255 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 256 "root", "mqprio", "num_tc", "2", "map", "0", "1", 257 "queues", "%s@0" % num_queues_tc0, 258 "%s@%s" % (num_queues_tc1, num_queues_tc0), 259 "hw", "1", "mode", "channel"] 260 self.log.info(" ".join(tc_qdisc_map_cmd)) 261 self.exec_cmd(tc_qdisc_map_cmd) 262 263 time.sleep(5) 264 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 265 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 266 self.exec_cmd(tc_qdisc_ingress_cmd) 267 268 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 269 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 270 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 271 272 for port in nic_ports: 273 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 274 "protocol", "ip", "ingress", "prio", "1", "flower", 275 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 276 "skip_sw", "hw_tc", "1"] 277 self.log.info(" ".join(tc_filter_cmd)) 278 self.exec_cmd(tc_filter_cmd) 279 280 # show tc configuration 281 self.log.info("Show tc configuration for %s NIC..." % nic_name) 282 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 283 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 284 self.log.info("%s" % tc_disk_out) 285 self.log.info("%s" % tc_filter_out) 286 287 # Ethtool coalesce settings must be applied after configuring traffic classes 288 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 289 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 290 291 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 292 xps_cmd = ["sudo", xps_script_path, nic_name] 293 self.log.info(xps_cmd) 294 self.exec_cmd(xps_cmd) 295 296 def reload_driver(self, driver, *modprobe_args): 297 298 try: 299 self.exec_cmd(["sudo", "rmmod", driver]) 300 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 301 except CalledProcessError as e: 302 self.log.error("ERROR: failed to reload %s module!" % driver) 303 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 304 305 def adq_configure_nic(self): 306 self.log.info("Configuring NIC port settings for ADQ testing...") 307 308 # Reload the driver first, to make sure any previous settings are re-set. 309 self.reload_driver("ice") 310 311 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 312 for nic in nic_names: 313 self.log.info(nic) 314 try: 315 self.exec_cmd(["sudo", "ethtool", "-K", nic, 316 "hw-tc-offload", "on"]) # Enable hardware TC offload 317 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 318 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 319 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 320 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 321 # but can be used with 4k block sizes as well 322 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 323 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 324 except subprocess.CalledProcessError as e: 325 self.log.error("ERROR: failed to configure NIC port using ethtool!") 326 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 327 self.log.info("Please update your NIC driver and firmware versions and try again.") 328 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 329 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 330 331 def configure_services(self): 332 self.log.info("Configuring active services...") 333 svc_config = configparser.ConfigParser(strict=False) 334 335 # Below list is valid only for RHEL / Fedora systems and might not 336 # contain valid names for other distributions. 337 svc_target_state = { 338 "firewalld": "inactive", 339 "irqbalance": "inactive", 340 "lldpad.service": "inactive", 341 "lldpad.socket": "inactive" 342 } 343 344 for service in svc_target_state: 345 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 346 out = "\n".join(["[%s]" % service, out]) 347 svc_config.read_string(out) 348 349 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 350 continue 351 352 service_state = svc_config[service]["ActiveState"] 353 self.log.info("Current state of %s service is %s" % (service, service_state)) 354 self.svc_restore_dict.update({service: service_state}) 355 if service_state != "inactive": 356 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 357 self.exec_cmd(["sudo", "systemctl", "stop", service]) 358 359 def configure_sysctl(self): 360 self.log.info("Tuning sysctl settings...") 361 362 busy_read = 0 363 busy_poll = 0 364 if self.enable_adq and self.mode == "spdk": 365 busy_read = 1 366 busy_poll = 1 367 368 sysctl_opts = { 369 "net.core.busy_poll": busy_poll, 370 "net.core.busy_read": busy_read, 371 "net.core.somaxconn": 4096, 372 "net.core.netdev_max_backlog": 8192, 373 "net.ipv4.tcp_max_syn_backlog": 16384, 374 "net.core.rmem_max": 268435456, 375 "net.core.wmem_max": 268435456, 376 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 377 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 378 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 379 "net.ipv4.route.flush": 1, 380 "vm.overcommit_memory": 1, 381 "net.core.rps_sock_flow_entries": 0 382 } 383 384 if self.enable_adq: 385 sysctl_opts.update(self.adq_set_busy_read(1)) 386 387 if self.enable_arfs: 388 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 389 390 for opt, value in sysctl_opts.items(): 391 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 392 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 393 394 def configure_tuned(self): 395 if not self.tuned_profile: 396 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 397 return 398 399 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 400 service = "tuned" 401 tuned_config = configparser.ConfigParser(strict=False) 402 403 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 404 out = "\n".join(["[%s]" % service, out]) 405 tuned_config.read_string(out) 406 tuned_state = tuned_config[service]["ActiveState"] 407 self.svc_restore_dict.update({service: tuned_state}) 408 409 if tuned_state != "inactive": 410 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 411 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 412 413 self.tuned_restore_dict = { 414 "profile": profile, 415 "mode": profile_mode 416 } 417 418 self.exec_cmd(["sudo", "systemctl", "start", service]) 419 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 420 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 421 422 def configure_cpu_governor(self): 423 self.log.info("Setting CPU governor to performance...") 424 425 # This assumes that there is the same CPU scaling governor on each CPU 426 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 427 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 428 429 def get_core_list_from_mask(self, core_mask): 430 # Generate list of individual cores from hex core mask 431 # (e.g. '0xffff') or list containing: 432 # - individual cores (e.g. '1, 2, 3') 433 # - core ranges (e.g. '0-3') 434 # - mix of both (e.g. '0, 1-4, 9, 11-13') 435 436 core_list = [] 437 if "0x" in core_mask: 438 core_mask_int = int(core_mask, 16) 439 for i in range(core_mask_int.bit_length()): 440 if (1 << i) & core_mask_int: 441 core_list.append(i) 442 return core_list 443 else: 444 # Core list can be provided in .json config with square brackets 445 # remove them first 446 core_mask = core_mask.replace("[", "") 447 core_mask = core_mask.replace("]", "") 448 449 for i in core_mask.split(","): 450 if "-" in i: 451 start, end = i.split("-") 452 core_range = range(int(start), int(end) + 1) 453 core_list.extend(core_range) 454 else: 455 core_list.append(int(i)) 456 return core_list 457 458 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 459 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 460 461 if mode not in ["default", "bynode", "cpulist"]: 462 raise ValueError("%s irq affinity setting not supported" % mode) 463 464 if mode == "cpulist" and not cpulist: 465 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 466 467 affinity_script = "set_irq_affinity.sh" 468 if "default" not in mode: 469 affinity_script = "set_irq_affinity_cpulist.sh" 470 system_cpu_map = self.get_numa_cpu_map() 471 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 472 473 def cpu_list_to_string(cpulist): 474 return ",".join(map(lambda x: str(x), cpulist)) 475 476 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 477 for nic_name in nic_names: 478 irq_cmd = ["sudo", irq_script_path] 479 480 # Use only CPU cores matching NIC NUMA node. 481 # Remove any CPU cores if they're on exclusion list. 482 if mode == "bynode": 483 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 484 if cpulist and exclude_cpulist: 485 disallowed_cpus = self.get_core_list_from_mask(cpulist) 486 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 487 if not irq_cpus: 488 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 489 irq_cmd.append(cpu_list_to_string(irq_cpus)) 490 491 if mode == "cpulist": 492 irq_cpus = self.get_core_list_from_mask(cpulist) 493 if exclude_cpulist: 494 # Flatten system CPU list, we don't need NUMA awareness here 495 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 496 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 497 if not irq_cpus: 498 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 499 irq_cmd.append(cpu_list_to_string(irq_cpus)) 500 501 irq_cmd.append(nic_name) 502 self.log.info(irq_cmd) 503 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 504 505 def restore_services(self): 506 self.log.info("Restoring services...") 507 for service, state in self.svc_restore_dict.items(): 508 cmd = "stop" if state == "inactive" else "start" 509 self.exec_cmd(["sudo", "systemctl", cmd, service]) 510 511 def restore_sysctl(self): 512 self.log.info("Restoring sysctl settings...") 513 for opt, value in self.sysctl_restore_dict.items(): 514 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 515 516 def restore_tuned(self): 517 self.log.info("Restoring tuned-adm settings...") 518 519 if not self.tuned_restore_dict: 520 return 521 522 if self.tuned_restore_dict["mode"] == "auto": 523 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 524 self.log.info("Reverted tuned-adm to auto_profile.") 525 else: 526 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 527 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 528 529 def restore_governor(self): 530 self.log.info("Restoring CPU governor setting...") 531 if self.governor_restore: 532 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 533 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 534 535 def restore_settings(self): 536 self.restore_governor() 537 self.restore_tuned() 538 self.restore_services() 539 self.restore_sysctl() 540 if self.enable_adq: 541 self.reload_driver("ice") 542 543 def check_for_throttling(self): 544 patterns = [ 545 r"CPU\d+: Core temperature is above threshold, cpu clock is throttled", 546 r"mlx5_core \S+: poll_health:\d+:\(pid \d+\): device's health compromised", 547 r"mlx5_core \S+: print_health_info:\d+:\(pid \d+\): Health issue observed, High temperature", 548 r"mlx5_core \S+: temp_warn:\d+:\(pid \d+\): High temperature on sensors" 549 ] 550 551 issue_found = False 552 try: 553 output = self.exec_cmd(["dmesg"]) 554 555 for line in output.split("\n"): 556 for pattern in patterns: 557 if re.search(pattern, line): 558 self.log.warning("Throttling or temperature issue found") 559 issue_found = True 560 return 1 if issue_found else 0 561 except Exception as e: 562 self.log.error("ERROR: failed to execute dmesg command") 563 self.log.error(f"Error {e}") 564 return 1 565 566 def stop(self): 567 if self.check_for_throttling(): 568 err_message = (f"Throttling or temperature issue found on {self.name} system! " 569 "Check system logs!") 570 raise CpuThrottlingError(err_message) 571 572 573class Target(Server): 574 def __init__(self, name, general_config, target_config): 575 super().__init__(name, general_config, target_config) 576 577 # Defaults 578 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 579 self.subsystem_info_list = [] 580 self.initiator_info = [] 581 582 config_fields = [ 583 ConfigField(name='mode', required=True), 584 ConfigField(name='results_dir', required=True), 585 ConfigField(name='enable_pm', default=True), 586 ConfigField(name='enable_sar', default=True), 587 ConfigField(name='enable_pcm', default=True), 588 ConfigField(name='enable_dpdk_memory', default=True) 589 ] 590 self.read_config(config_fields, target_config) 591 592 self.null_block = target_config.get('null_block_devices', 0) 593 self.scheduler_name = target_config.get('scheduler_settings', 'static') 594 self.enable_zcopy = target_config.get('zcopy_settings', False) 595 self.enable_bw = target_config.get('enable_bandwidth', True) 596 self.nvme_blocklist = target_config.get('blocklist', []) 597 self.nvme_allowlist = target_config.get('allowlist', []) 598 599 # Blocklist takes precedence, remove common elements from allowlist 600 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 601 602 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 603 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 604 605 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 606 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 607 self.set_local_nic_info(self.set_local_nic_info_helper()) 608 609 if self.skip_spdk_install is False: 610 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 611 612 self.configure_system() 613 if self.enable_adq: 614 self.configure_adq() 615 self.configure_irq_affinity() 616 self.sys_config() 617 618 def set_local_nic_info_helper(self): 619 return json.loads(self.exec_cmd(["lshw", "-json"])) 620 621 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 622 stderr_opt = None 623 if stderr_redirect: 624 stderr_opt = subprocess.STDOUT 625 if change_dir: 626 old_cwd = os.getcwd() 627 os.chdir(change_dir) 628 self.log.info("Changing directory to %s" % change_dir) 629 630 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 631 632 if change_dir: 633 os.chdir(old_cwd) 634 self.log.info("Changing directory to %s" % old_cwd) 635 return out 636 637 def zip_spdk_sources(self, spdk_dir, dest_file): 638 self.log.info("Zipping SPDK source directory") 639 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 640 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 641 for file in files: 642 fh.write(os.path.relpath(os.path.join(root, file))) 643 fh.close() 644 self.log.info("Done zipping") 645 646 @staticmethod 647 def _chunks(input_list, chunks_no): 648 div, rem = divmod(len(input_list), chunks_no) 649 for i in range(chunks_no): 650 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 651 yield input_list[si:si + (div + 1 if i < rem else div)] 652 653 def spread_bdevs(self, req_disks): 654 # Spread available block devices indexes: 655 # - evenly across available initiator systems 656 # - evenly across available NIC interfaces for 657 # each initiator 658 # Not NUMA aware. 659 ip_bdev_map = [] 660 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 661 662 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 663 self.initiator_info[i]["bdev_range"] = init_chunk 664 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 665 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 666 for c in nic_chunk: 667 ip_bdev_map.append((ip, c)) 668 return ip_bdev_map 669 670 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 671 cpu_number = os.cpu_count() 672 sar_idle_sum = 0 673 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 674 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 675 676 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 677 time.sleep(ramp_time) 678 self.log.info("Starting SAR measurements") 679 680 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 681 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 682 for line in out.split("\n"): 683 if "Average" in line: 684 if "CPU" in line: 685 self.log.info("Summary CPU utilization from SAR:") 686 self.log.info(line) 687 elif "all" in line: 688 self.log.info(line) 689 else: 690 sar_idle_sum += float(line.split()[7]) 691 fh.write(out) 692 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 693 694 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 695 f.write("%0.2f" % sar_cpu_usage) 696 697 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 698 time.sleep(ramp_time) 699 self.log.info("Starting power measurements") 700 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 701 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 702 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 703 704 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 705 time.sleep(ramp_time) 706 cmd = ["pcm", "1", "-i=%s" % run_time, 707 "-csv=%s/%s" % (results_dir, pcm_file_name)] 708 subprocess.run(cmd) 709 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 710 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 711 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 712 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 713 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 714 715 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 716 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 717 time.sleep(ramp_time) 718 self.log.info("INFO: starting network bandwidth measure") 719 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 720 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 721 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 722 # Improve this so that additional file containing measurements average is generated too. 723 # TODO: Monitor only these interfaces which are currently used to run the workload. 724 725 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 726 self.log.info("INFO: waiting to generate DPDK memory usage") 727 time.sleep(ramp_time) 728 self.log.info("INFO: generating DPDK memory usage") 729 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 730 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 731 732 def sys_config(self): 733 self.log.info("====Kernel release:====") 734 self.log.info(os.uname().release) 735 self.log.info("====Kernel command line:====") 736 with open('/proc/cmdline') as f: 737 cmdline = f.readlines() 738 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 739 self.log.info("====sysctl conf:====") 740 with open('/etc/sysctl.conf') as f: 741 sysctl = f.readlines() 742 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 743 self.log.info("====Cpu power info:====") 744 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 745 self.log.info("====zcopy settings:====") 746 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 747 self.log.info("====Scheduler settings:====") 748 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 749 750 751class Initiator(Server): 752 def __init__(self, name, general_config, initiator_config): 753 super().__init__(name, general_config, initiator_config) 754 755 # required fields and defaults 756 config_fields = [ 757 ConfigField(name='mode', required=True), 758 ConfigField(name='ip', required=True), 759 ConfigField(name='target_nic_ips', required=True), 760 ConfigField(name='cpus_allowed', default=None), 761 ConfigField(name='cpus_allowed_policy', default='shared'), 762 ConfigField(name='spdk_dir', default='/tmp/spdk'), 763 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 764 ConfigField(name='nvmecli_bin', default='nvme'), 765 ConfigField(name='cpu_frequency', default=None), 766 ConfigField(name='allow_cpu_sharing', default=True) 767 ] 768 769 self.read_config(config_fields, initiator_config) 770 771 if os.getenv('SPDK_WORKSPACE'): 772 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 773 774 self.subsystem_info_list = [] 775 776 self.ssh_connection = paramiko.SSHClient() 777 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 778 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 779 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 780 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 781 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 782 783 if self.skip_spdk_install is False: 784 self.copy_spdk("/tmp/spdk.zip") 785 786 self.set_local_nic_info(self.set_local_nic_info_helper()) 787 self.set_cpu_frequency() 788 self.configure_system() 789 if self.enable_adq: 790 self.configure_adq() 791 self.sys_config() 792 793 def set_local_nic_info_helper(self): 794 return json.loads(self.exec_cmd(["lshw", "-json"])) 795 796 def stop(self): 797 self.restore_settings() 798 try: 799 super().stop() 800 except CpuThrottlingError as err: 801 raise err 802 finally: 803 self.ssh_connection.close() 804 805 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 806 if change_dir: 807 cmd = ["cd", change_dir, ";", *cmd] 808 809 # In case one of the command elements contains whitespace and is not 810 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 811 # when sending to remote system. 812 for i, c in enumerate(cmd): 813 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 814 cmd[i] = '"%s"' % c 815 cmd = " ".join(cmd) 816 817 # Redirect stderr to stdout thanks using get_pty option if needed 818 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 819 out = stdout.read().decode(encoding="utf-8") 820 821 # Check the return code 822 rc = stdout.channel.recv_exit_status() 823 if rc: 824 raise CalledProcessError(int(rc), cmd, out) 825 826 return out 827 828 def put_file(self, local, remote_dest): 829 ftp = self.ssh_connection.open_sftp() 830 ftp.put(local, remote_dest) 831 ftp.close() 832 833 def get_file(self, remote, local_dest): 834 ftp = self.ssh_connection.open_sftp() 835 ftp.get(remote, local_dest) 836 ftp.close() 837 838 def copy_spdk(self, local_spdk_zip): 839 self.log.info("Copying SPDK sources to initiator %s" % self.name) 840 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 841 self.log.info("Copied sources zip from target") 842 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 843 self.log.info("Sources unpacked") 844 845 def copy_result_files(self, dest_dir): 846 self.log.info("Copying results") 847 848 if not os.path.exists(dest_dir): 849 os.mkdir(dest_dir) 850 851 # Get list of result files from initiator and copy them back to target 852 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 853 854 for file in file_list: 855 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 856 os.path.join(dest_dir, file)) 857 self.log.info("Done copying results") 858 859 def match_subsystems(self, target_subsystems): 860 subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips] 861 if not subsystems: 862 raise Exception("No matching subsystems found on target side!") 863 subsystems.sort(key=lambda x: x[1]) 864 self.log.info("Found matching subsystems on target side:") 865 for s in subsystems: 866 self.log.info(s) 867 self.subsystem_info_list = subsystems 868 869 @abstractmethod 870 def init_connect(self): 871 pass 872 873 @abstractmethod 874 def init_disconnect(self): 875 pass 876 877 @abstractmethod 878 def gen_fio_filename_conf(self, *args, **kwargs): 879 # Logic implemented in SPDKInitiator and KernelInitiator classes 880 pass 881 882 def get_route_nic_numa(self, remote_nvme_ip): 883 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 884 local_nvme_nic = local_nvme_nic[0]["dev"] 885 return self.get_nic_numa_node(local_nvme_nic) 886 887 @staticmethod 888 def gen_fio_offset_section(offset_inc, num_jobs): 889 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 890 return "\n".join(["size=%s%%" % offset_inc, 891 "offset=0%", 892 "offset_increment=%s%%" % offset_inc]) 893 894 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 895 numa_stats = {} 896 allowed_cpus = [] 897 for nvme in fio_filenames_list: 898 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 899 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 900 901 # Use the most common NUMA node for this chunk to allocate memory and CPUs 902 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 903 904 # Check if we have enough free CPUs to pop from the list before assigning them 905 if len(self.available_cpus[section_local_numa]) < num_jobs: 906 if self.allow_cpu_sharing: 907 self.log.info("Regenerating available CPU list %s" % section_local_numa) 908 # Remove still available CPUs from the regenerated list. We don't want to 909 # regenerate it with duplicates. 910 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 911 self.available_cpus[section_local_numa].extend(cpus_regen) 912 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 913 else: 914 self.log.error("No more free CPU cores to use from allowed_cpus list!") 915 raise IndexError 916 917 for _ in range(num_jobs): 918 try: 919 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 920 except IndexError: 921 self.log.error("No more free CPU cores to use from allowed_cpus list!") 922 raise 923 924 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 925 "numa_mem_policy=prefer:%s" % section_local_numa]) 926 927 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 928 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 929 offset=False, offset_inc=0, numa_align=True): 930 fio_conf_template = """ 931[global] 932ioengine={ioengine} 933{spdk_conf} 934thread=1 935group_reporting=1 936direct=1 937percentile_list=50:90:99:99.5:99.9:99.99:99.999 938 939norandommap=1 940rw={rw} 941rwmixread={rwmixread} 942bs={block_size} 943time_based=1 944ramp_time={ramp_time} 945runtime={run_time} 946rate_iops={rate_iops} 947""" 948 949 if self.cpus_allowed is not None: 950 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 951 cpus_num = 0 952 cpus = self.cpus_allowed.split(",") 953 for cpu in cpus: 954 if "-" in cpu: 955 a, b = cpu.split("-") 956 a = int(a) 957 b = int(b) 958 cpus_num += len(range(a, b)) 959 else: 960 cpus_num += 1 961 self.num_cores = cpus_num 962 threads = range(0, self.num_cores) 963 elif hasattr(self, 'num_cores'): 964 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 965 threads = range(0, int(self.num_cores)) 966 else: 967 self.num_cores = len(self.subsystem_info_list) 968 threads = range(0, len(self.subsystem_info_list)) 969 970 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 971 offset, offset_inc, numa_align) 972 973 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 974 rw=rw, rwmixread=rwmixread, block_size=block_size, 975 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 976 977 # TODO: hipri disabled for now, as it causes fio errors: 978 # io_u error on file /dev/nvme2n1: Operation not supported 979 # See comment in KernelInitiator class, init_connect() function 980 if "io_uring" in self.ioengine: 981 fio_config = fio_config + """ 982fixedbufs=1 983registerfiles=1 984#hipri=1 985""" 986 if num_jobs: 987 fio_config = fio_config + "numjobs=%s \n" % num_jobs 988 if self.cpus_allowed is not None: 989 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 990 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 991 fio_config = fio_config + filename_section 992 993 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 994 if hasattr(self, "num_cores"): 995 fio_config_filename += "_%sCPU" % self.num_cores 996 fio_config_filename += ".fio" 997 998 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 999 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 1000 self.log.info("Created FIO Config:") 1001 self.log.info(fio_config) 1002 1003 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 1004 1005 def set_cpu_frequency(self): 1006 if self.cpu_frequency is not None: 1007 try: 1008 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 1009 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 1010 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 1011 except Exception: 1012 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 1013 sys.exit(1) 1014 else: 1015 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 1016 1017 def run_fio(self, fio_config_file, run_num=1): 1018 job_name, _ = os.path.splitext(fio_config_file) 1019 self.log.info("Starting FIO run for job: %s" % job_name) 1020 self.log.info("Using FIO: %s" % self.fio_bin) 1021 1022 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 1023 try: 1024 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 1025 "--output=%s" % output_filename, "--eta=never"], True) 1026 self.log.info(output) 1027 self.log.info("FIO run finished. Results in: %s" % output_filename) 1028 except subprocess.CalledProcessError as e: 1029 self.log.error("ERROR: Fio process failed!") 1030 self.log.error(e.stdout) 1031 1032 def sys_config(self): 1033 self.log.info("====Kernel release:====") 1034 self.log.info(self.exec_cmd(["uname", "-r"])) 1035 self.log.info("====Kernel command line:====") 1036 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1037 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1038 self.log.info("====sysctl conf:====") 1039 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 1040 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1041 self.log.info("====Cpu power info:====") 1042 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 1043 1044 1045class KernelTarget(Target): 1046 def __init__(self, name, general_config, target_config): 1047 super().__init__(name, general_config, target_config) 1048 # Defaults 1049 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1050 1051 def load_drivers(self): 1052 self.log.info("Loading drivers") 1053 super().load_drivers() 1054 if self.null_block: 1055 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1056 1057 def configure_adq(self): 1058 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1059 1060 def adq_configure_tc(self): 1061 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1062 1063 def adq_set_busy_read(self, busy_read_val): 1064 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1065 return {"net.core.busy_read": 0} 1066 1067 def stop(self): 1068 self.nvmet_command(self.nvmet_bin, "clear") 1069 self.restore_settings() 1070 super().stop() 1071 1072 def get_nvme_device_bdf(self, nvme_dev_path): 1073 nvme_name = os.path.basename(nvme_dev_path) 1074 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1075 1076 def get_nvme_devices(self): 1077 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1078 nvme_list = [] 1079 for dev in dev_list: 1080 if "nvme" not in dev: 1081 continue 1082 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1083 continue 1084 if len(self.nvme_allowlist) == 0: 1085 nvme_list.append(dev) 1086 continue 1087 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1088 nvme_list.append(dev) 1089 return nvme_list 1090 1091 def nvmet_command(self, nvmet_bin, command): 1092 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1093 1094 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1095 1096 nvmet_cfg = { 1097 "ports": [], 1098 "hosts": [], 1099 "subsystems": [], 1100 } 1101 1102 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1103 port = str(4420 + bdev_num) 1104 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1105 serial = "SPDK00%s" % bdev_num 1106 bdev_name = nvme_list[bdev_num] 1107 1108 nvmet_cfg["subsystems"].append({ 1109 "allowed_hosts": [], 1110 "attr": { 1111 "allow_any_host": "1", 1112 "serial": serial, 1113 "version": "1.3" 1114 }, 1115 "namespaces": [ 1116 { 1117 "device": { 1118 "path": bdev_name, 1119 "uuid": "%s" % uuid.uuid4() 1120 }, 1121 "enable": 1, 1122 "nsid": port 1123 } 1124 ], 1125 "nqn": nqn 1126 }) 1127 1128 nvmet_cfg["ports"].append({ 1129 "addr": { 1130 "adrfam": "ipv4", 1131 "traddr": ip, 1132 "trsvcid": port, 1133 "trtype": self.transport 1134 }, 1135 "portid": bdev_num, 1136 "referrals": [], 1137 "subsystems": [nqn] 1138 }) 1139 1140 self.subsystem_info_list.append((port, nqn, ip)) 1141 self.subsys_no = len(self.subsystem_info_list) 1142 1143 with open("kernel.conf", "w") as fh: 1144 fh.write(json.dumps(nvmet_cfg, indent=2)) 1145 1146 def tgt_start(self): 1147 self.log.info("Configuring kernel NVMeOF Target") 1148 1149 if self.null_block: 1150 self.log.info("Configuring with null block device.") 1151 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1152 else: 1153 self.log.info("Configuring with NVMe drives.") 1154 nvme_list = self.get_nvme_devices() 1155 1156 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1157 self.subsys_no = len(nvme_list) 1158 1159 self.nvmet_command(self.nvmet_bin, "clear") 1160 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1161 1162 if self.enable_adq: 1163 self.adq_configure_tc() 1164 1165 self.log.info("Done configuring kernel NVMeOF Target") 1166 1167 1168class SPDKTarget(Target): 1169 def __init__(self, name, general_config, target_config): 1170 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1171 # Method setting IRQ affinity is run as part of parent classes init, 1172 # so we need to have self.core_mask set before changing IRQ affinity. 1173 self.core_mask = target_config["core_mask"] 1174 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1175 1176 super().__init__(name, general_config, target_config) 1177 1178 # Format: property, default value 1179 config_fields = [ 1180 ConfigField(name='dif_insert_strip', default=False), 1181 ConfigField(name='null_block_dif_type', default=0), 1182 ConfigField(name='num_shared_buffers', default=4096), 1183 ConfigField(name='max_queue_depth', default=128), 1184 ConfigField(name='bpf_scripts', default=[]), 1185 ConfigField(name='scheduler_core_limit', default=None), 1186 ConfigField(name='dsa_settings', default=False), 1187 ConfigField(name='iobuf_small_pool_count', default=32767), 1188 ConfigField(name='iobuf_large_pool_count', default=16383), 1189 ConfigField(name='num_cqe', default=4096), 1190 ConfigField(name='sock_impl', default='posix') 1191 ] 1192 1193 self.read_config(config_fields, target_config) 1194 1195 self.bpf_proc = None 1196 self.enable_dsa = False 1197 1198 self.log.info("====DSA settings:====") 1199 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1200 1201 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1202 if mode not in ["default", "bynode", "cpulist", 1203 "shared", "split", "split-bynode"]: 1204 self.log.error("%s irq affinity setting not supported" % mode) 1205 raise Exception 1206 1207 # Create core list from SPDK's mask and change it to string. 1208 # This is the type configure_irq_affinity expects for cpulist parameter. 1209 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1210 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1211 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1212 1213 if mode == "shared": 1214 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1215 elif mode == "split": 1216 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1217 elif mode == "split-bynode": 1218 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1219 else: 1220 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1221 1222 def adq_set_busy_read(self, busy_read_val): 1223 return {"net.core.busy_read": busy_read_val} 1224 1225 def get_nvme_devices_count(self): 1226 return len(self.get_nvme_devices()) 1227 1228 def get_nvme_devices(self): 1229 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1230 bdev_bdfs = [] 1231 for bdev in bdev_subsys_json_obj["config"]: 1232 bdev_traddr = bdev["params"]["traddr"] 1233 if bdev_traddr in self.nvme_blocklist: 1234 continue 1235 if len(self.nvme_allowlist) == 0: 1236 bdev_bdfs.append(bdev_traddr) 1237 if bdev_traddr in self.nvme_allowlist: 1238 bdev_bdfs.append(bdev_traddr) 1239 return bdev_bdfs 1240 1241 def spdk_tgt_configure(self): 1242 self.log.info("Configuring SPDK NVMeOF target via RPC") 1243 1244 # Create transport layer 1245 nvmf_transport_params = { 1246 "client": self.client, 1247 "trtype": self.transport, 1248 "num_shared_buffers": self.num_shared_buffers, 1249 "max_queue_depth": self.max_queue_depth, 1250 "dif_insert_or_strip": self.dif_insert_strip, 1251 "sock_priority": self.adq_priority, 1252 "num_cqe": self.num_cqe 1253 } 1254 1255 if self.enable_adq: 1256 nvmf_transport_params["acceptor_poll_rate"] = 10000 1257 1258 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1259 self.log.info("SPDK NVMeOF transport layer:") 1260 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1261 1262 if self.null_block: 1263 self.spdk_tgt_add_nullblock(self.null_block) 1264 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1265 else: 1266 self.spdk_tgt_add_nvme_conf() 1267 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1268 1269 if self.enable_adq: 1270 self.adq_configure_tc() 1271 1272 self.log.info("Done configuring SPDK NVMeOF Target") 1273 1274 def spdk_tgt_add_nullblock(self, null_block_count): 1275 md_size = 0 1276 block_size = 4096 1277 if self.null_block_dif_type != 0: 1278 md_size = 128 1279 1280 self.log.info("Adding null block bdevices to config via RPC") 1281 for i in range(null_block_count): 1282 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1283 rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i), 1284 dif_type=self.null_block_dif_type, md_size=md_size) 1285 self.log.info("SPDK Bdevs configuration:") 1286 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1287 1288 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1289 self.log.info("Adding NVMe bdevs to config via RPC") 1290 1291 bdfs = self.get_nvme_devices() 1292 bdfs = [b.replace(":", ".") for b in bdfs] 1293 1294 if req_num_disks: 1295 if req_num_disks > len(bdfs): 1296 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1297 sys.exit(1) 1298 else: 1299 bdfs = bdfs[0:req_num_disks] 1300 1301 for i, bdf in enumerate(bdfs): 1302 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1303 1304 self.log.info("SPDK Bdevs configuration:") 1305 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1306 1307 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1308 self.log.info("Adding subsystems to config") 1309 if not req_num_disks: 1310 req_num_disks = self.get_nvme_devices_count() 1311 1312 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1313 port = str(4420 + bdev_num) 1314 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1315 serial = "SPDK00%s" % bdev_num 1316 bdev_name = "Nvme%sn1" % bdev_num 1317 1318 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1319 allow_any_host=True, max_namespaces=8) 1320 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1321 for nqn_name in [nqn, "discovery"]: 1322 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1323 nqn=nqn_name, 1324 trtype=self.transport, 1325 traddr=ip, 1326 trsvcid=port, 1327 adrfam="ipv4") 1328 self.subsystem_info_list.append((port, nqn, ip)) 1329 self.subsys_no = len(self.subsystem_info_list) 1330 1331 self.log.info("SPDK NVMeOF subsystem configuration:") 1332 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1333 1334 def bpf_start(self): 1335 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1336 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1337 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1338 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1339 1340 with open(self.pid, "r") as fh: 1341 nvmf_pid = str(fh.readline()) 1342 1343 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1344 self.log.info(cmd) 1345 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1346 1347 def tgt_start(self): 1348 if self.null_block: 1349 self.subsys_no = 1 1350 else: 1351 self.subsys_no = self.get_nvme_devices_count() 1352 self.log.info("Starting SPDK NVMeOF Target process") 1353 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1354 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1355 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1356 1357 with open(self.pid, "w") as fh: 1358 fh.write(str(proc.pid)) 1359 self.nvmf_proc = proc 1360 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1361 self.log.info("Waiting for spdk to initialize...") 1362 while True: 1363 if os.path.exists("/var/tmp/spdk.sock"): 1364 break 1365 time.sleep(1) 1366 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1367 1368 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1369 rpc.iobuf.iobuf_set_options(self.client, 1370 small_pool_count=self.iobuf_small_pool_count, 1371 large_pool_count=self.iobuf_large_pool_count, 1372 small_bufsize=None, 1373 large_bufsize=None) 1374 1375 if self.enable_zcopy: 1376 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1377 enable_zerocopy_send_server=True) 1378 self.log.info("Target socket options:") 1379 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1380 1381 if self.enable_adq: 1382 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1383 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1384 nvme_adminq_poll_period_us=100000, retry_count=4) 1385 1386 if self.enable_dsa: 1387 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1388 self.log.info("Target DSA accel module enabled") 1389 1390 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1391 rpc.framework_start_init(self.client) 1392 1393 if self.bpf_scripts: 1394 self.bpf_start() 1395 1396 self.spdk_tgt_configure() 1397 1398 def stop(self): 1399 if self.bpf_proc: 1400 self.log.info("Stopping BPF Trace script") 1401 self.bpf_proc.terminate() 1402 self.bpf_proc.wait() 1403 1404 if hasattr(self, "nvmf_proc"): 1405 try: 1406 self.nvmf_proc.terminate() 1407 self.nvmf_proc.wait(timeout=30) 1408 except Exception as e: 1409 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1410 self.log.info(e) 1411 self.nvmf_proc.kill() 1412 self.nvmf_proc.communicate() 1413 # Try to clean up RPC socket files if they were not removed 1414 # because of using 'kill' 1415 try: 1416 os.remove("/var/tmp/spdk.sock") 1417 os.remove("/var/tmp/spdk.sock.lock") 1418 except FileNotFoundError: 1419 pass 1420 self.restore_settings() 1421 super().stop() 1422 1423 1424class KernelInitiator(Initiator): 1425 def __init__(self, name, general_config, initiator_config): 1426 super().__init__(name, general_config, initiator_config) 1427 1428 # Defaults 1429 self.extra_params = initiator_config.get('extra_params', '') 1430 1431 self.ioengine = "libaio" 1432 self.spdk_conf = "" 1433 1434 if "num_cores" in initiator_config: 1435 self.num_cores = initiator_config["num_cores"] 1436 1437 if "kernel_engine" in initiator_config: 1438 self.ioengine = initiator_config["kernel_engine"] 1439 if "io_uring" in self.ioengine: 1440 self.extra_params += ' --nr-poll-queues=8' 1441 1442 def configure_adq(self): 1443 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1444 1445 def adq_configure_tc(self): 1446 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1447 1448 def adq_set_busy_read(self, busy_read_val): 1449 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1450 return {"net.core.busy_read": 0} 1451 1452 def get_connected_nvme_list(self): 1453 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1454 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1455 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1456 return nvme_list 1457 1458 def init_connect(self): 1459 self.log.info("Below connection attempts may result in error messages, this is expected!") 1460 for subsystem in self.subsystem_info_list: 1461 self.log.info("Trying to connect %s %s %s" % subsystem) 1462 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1463 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1464 time.sleep(2) 1465 1466 if "io_uring" in self.ioengine: 1467 self.log.info("Setting block layer settings for io_uring.") 1468 1469 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1470 # apparently it's not possible for connected subsystems. 1471 # Results in "error: Invalid argument" 1472 block_sysfs_settings = { 1473 "iostats": "0", 1474 "rq_affinity": "0", 1475 "nomerges": "2" 1476 } 1477 1478 for disk in self.get_connected_nvme_list(): 1479 sysfs = os.path.join("/sys/block", disk, "queue") 1480 for k, v in block_sysfs_settings.items(): 1481 sysfs_opt_path = os.path.join(sysfs, k) 1482 try: 1483 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1484 except CalledProcessError as e: 1485 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1486 finally: 1487 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1488 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1489 1490 def init_disconnect(self): 1491 for subsystem in self.subsystem_info_list: 1492 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1493 time.sleep(1) 1494 1495 def get_nvme_subsystem_numa(self, dev_name): 1496 # Remove two last characters to get controller name instead of subsystem name 1497 nvme_ctrl = os.path.basename(dev_name)[:-2] 1498 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1499 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1500 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1501 1502 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True): 1503 self.available_cpus = self.get_numa_cpu_map() 1504 if len(threads) >= len(subsystems): 1505 threads = range(0, len(subsystems)) 1506 1507 # Generate connected nvme devices names and sort them by used NIC numa node 1508 # to allow better grouping when splitting into fio sections. 1509 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1510 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1511 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1512 1513 filename_section = "" 1514 nvme_per_split = int(len(nvme_list) / len(threads)) 1515 remainder = len(nvme_list) % len(threads) 1516 iterator = iter(nvme_list) 1517 result = [] 1518 for i in range(len(threads)): 1519 result.append([]) 1520 for _ in range(nvme_per_split): 1521 result[i].append(next(iterator)) 1522 if remainder: 1523 result[i].append(next(iterator)) 1524 remainder -= 1 1525 for i, r in enumerate(result): 1526 header = "[filename%s]" % i 1527 disks = "\n".join(["filename=%s" % x for x in r]) 1528 job_section_qd = round((io_depth * len(r)) / num_jobs) 1529 if job_section_qd == 0: 1530 job_section_qd = 1 1531 iodepth = "iodepth=%s" % job_section_qd 1532 1533 offset_section = "" 1534 if offset: 1535 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1536 1537 numa_opts = "" 1538 if numa_align: 1539 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1540 1541 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1542 1543 return filename_section 1544 1545 1546class SPDKInitiator(Initiator): 1547 def __init__(self, name, general_config, initiator_config): 1548 super().__init__(name, general_config, initiator_config) 1549 1550 if self.skip_spdk_install is False: 1551 self.install_spdk() 1552 1553 # Optional fields 1554 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1555 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1556 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1557 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1558 1559 if "num_cores" in initiator_config: 1560 self.num_cores = initiator_config["num_cores"] 1561 1562 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1563 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1564 1565 def adq_set_busy_read(self, busy_read_val): 1566 return {"net.core.busy_read": busy_read_val} 1567 1568 def install_spdk(self): 1569 self.log.info("Using fio binary %s" % self.fio_bin) 1570 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1571 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1572 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1573 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1574 "--enable-lto", "--disable-unit-tests"]) 1575 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1576 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1577 1578 self.log.info("SPDK built") 1579 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1580 1581 def init_connect(self): 1582 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1583 # bdev plugin initiates connection just before starting IO traffic. 1584 # This is just to have a "init_connect" equivalent of the same function 1585 # from KernelInitiator class. 1586 # Just prepare bdev.conf JSON file for later use and consider it 1587 # "making a connection". 1588 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1589 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1590 1591 def init_disconnect(self): 1592 # SPDK Initiator does not need to explicity disconnect as this gets done 1593 # after fio bdev plugin finishes IO. 1594 return 1595 1596 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1597 spdk_cfg_section = { 1598 "subsystems": [ 1599 { 1600 "subsystem": "bdev", 1601 "config": [] 1602 }, 1603 { 1604 "subsystem": "iobuf", 1605 "config": [ 1606 { 1607 "method": "iobuf_set_options", 1608 "params": { 1609 "small_pool_count": self.small_pool_count, 1610 "large_pool_count": self.large_pool_count 1611 } 1612 } 1613 ] 1614 }, 1615 { 1616 "subsystem": "sock", 1617 "config": [ 1618 { 1619 "method": "sock_set_default_impl", 1620 "params": { 1621 "impl_name": self.sock_impl 1622 } 1623 } 1624 ] 1625 } 1626 ] 1627 } 1628 1629 for i, subsys in enumerate(remote_subsystem_list): 1630 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1631 nvme_ctrl = { 1632 "method": "bdev_nvme_attach_controller", 1633 "params": { 1634 "name": "Nvme{}".format(i), 1635 "trtype": self.transport, 1636 "traddr": sub_addr, 1637 "trsvcid": sub_port, 1638 "subnqn": sub_nqn, 1639 "adrfam": "IPv4" 1640 } 1641 } 1642 1643 if self.enable_adq: 1644 nvme_ctrl["params"].update({"priority": "1"}) 1645 1646 if self.enable_data_digest: 1647 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1648 1649 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1650 1651 return json.dumps(spdk_cfg_section, indent=2) 1652 1653 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True): 1654 self.available_cpus = self.get_numa_cpu_map() 1655 filename_section = "" 1656 if len(threads) >= len(subsystems): 1657 threads = range(0, len(subsystems)) 1658 1659 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1660 # to allow better grouping when splitting into fio sections. 1661 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1662 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1663 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1664 1665 nvme_per_split = int(len(subsystems) / len(threads)) 1666 remainder = len(subsystems) % len(threads) 1667 iterator = iter(filenames) 1668 result = [] 1669 for i in range(len(threads)): 1670 result.append([]) 1671 for _ in range(nvme_per_split): 1672 result[i].append(next(iterator)) 1673 if remainder: 1674 result[i].append(next(iterator)) 1675 remainder -= 1 1676 for i, r in enumerate(result): 1677 header = "[filename%s]" % i 1678 disks = "\n".join(["filename=%s" % x for x in r]) 1679 job_section_qd = round((io_depth * len(r)) / num_jobs) 1680 if job_section_qd == 0: 1681 job_section_qd = 1 1682 iodepth = "iodepth=%s" % job_section_qd 1683 1684 offset_section = "" 1685 if offset: 1686 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1687 1688 numa_opts = "" 1689 if numa_align: 1690 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1691 1692 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1693 1694 return filename_section 1695 1696 def get_nvme_subsystem_numa(self, bdev_name): 1697 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1698 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1699 1700 # Remove two last characters to get controller name instead of subsystem name 1701 nvme_ctrl = bdev_name[:-2] 1702 for bdev in bdev_conf_json_obj: 1703 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1704 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1705 return None 1706 1707 1708def initiators_match_subsystems(initiators, target_obj): 1709 for i in initiators: 1710 i.match_subsystems(target_obj.subsystem_info_list) 1711 if i.enable_adq: 1712 i.adq_configure_tc() 1713 1714 1715def run_single_fio_test(args, 1716 initiators, 1717 configs, 1718 target_obj, 1719 block_size, 1720 io_depth, 1721 rw, 1722 fio_rw_mix_read, 1723 fio_run_num, 1724 fio_ramp_time, 1725 fio_run_time): 1726 1727 for run_no in range(1, fio_run_num+1): 1728 threads = [] 1729 power_daemon = None 1730 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1731 1732 for i, cfg in zip(initiators, configs): 1733 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1734 threads.append(t) 1735 if target_obj.enable_sar: 1736 sar_file_prefix = measurements_prefix + "_sar" 1737 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1738 threads.append(t) 1739 1740 if target_obj.enable_pcm: 1741 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1742 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1743 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1744 threads.append(pcm_cpu_t) 1745 1746 if target_obj.enable_bw: 1747 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1748 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1749 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1750 threads.append(t) 1751 1752 if target_obj.enable_dpdk_memory: 1753 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1754 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1755 threads.append(t) 1756 1757 if target_obj.enable_pm: 1758 power_daemon = threading.Thread(target=target_obj.measure_power, 1759 args=(args.results, measurements_prefix, script_full_dir, 1760 fio_ramp_time, fio_run_time)) 1761 threads.append(power_daemon) 1762 1763 for t in threads: 1764 t.start() 1765 for t in threads: 1766 t.join() 1767 1768 1769def run_fio_tests(args, initiators, target_obj, 1770 fio_workloads, fio_rw_mix_read, 1771 fio_run_num, fio_ramp_time, fio_run_time, 1772 fio_rate_iops, fio_offset, fio_offset_inc, 1773 fio_numa_align): 1774 1775 for block_size, io_depth, rw in fio_workloads: 1776 configs = [] 1777 for i in initiators: 1778 i.init_connect() 1779 cfg = i.gen_fio_config(rw, 1780 fio_rw_mix_read, 1781 block_size, 1782 io_depth, 1783 target_obj.subsys_no, 1784 fio_num_jobs, 1785 fio_ramp_time, 1786 fio_run_time, 1787 fio_rate_iops, 1788 fio_offset, 1789 fio_offset_inc, 1790 fio_numa_align) 1791 configs.append(cfg) 1792 1793 run_single_fio_test(args, 1794 initiators, 1795 configs, 1796 target_obj, 1797 block_size, 1798 io_depth, 1799 rw, 1800 fio_rw_mix_read, 1801 fio_run_num, 1802 fio_ramp_time, 1803 fio_run_time) 1804 1805 for i in initiators: 1806 i.init_disconnect() 1807 i.copy_result_files(args.results) 1808 1809 try: 1810 parse_results(args.results, args.csv_filename) 1811 except Exception as err: 1812 logging.error("There was an error with parsing the results") 1813 raise err 1814 1815 1816class CpuThrottlingError(Exception): 1817 def __init__(self, message): 1818 super().__init__(message) 1819 1820 1821if __name__ == "__main__": 1822 exit_code = 0 1823 1824 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1825 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1826 1827 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1828 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1829 help='Configuration file.') 1830 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1831 help='Results directory.') 1832 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1833 help='CSV results filename.') 1834 parser.add_argument('-f', '--force', default=False, action='store_true', 1835 dest='force', help="""Force script to continue and try to use all 1836 available NVMe devices during test. 1837 WARNING: Might result in data loss on used NVMe drives""") 1838 1839 args = parser.parse_args() 1840 1841 logging.basicConfig(level=logging.INFO, 1842 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1843 1844 logging.info("Using config file: %s" % args.config) 1845 with open(args.config, "r") as config: 1846 data = json.load(config) 1847 1848 initiators = [] 1849 general_config = data["general"] 1850 target_config = data["target"] 1851 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1852 1853 if "null_block_devices" not in data["target"] and \ 1854 (args.force is False and 1855 "allowlist" not in data["target"] and 1856 "blocklist" not in data["target"]): 1857 # TODO: Also check if allowlist or blocklist are not empty. 1858 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1859 You can choose to use all available NVMe drives on your system, which may potentially 1860 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1861 exit(1) 1862 1863 for k, v in data.items(): 1864 if "target" in k: 1865 v.update({"results_dir": args.results}) 1866 if data[k]["mode"] == "spdk": 1867 target_obj = SPDKTarget(k, data["general"], v) 1868 elif data[k]["mode"] == "kernel": 1869 target_obj = KernelTarget(k, data["general"], v) 1870 elif "initiator" in k: 1871 if data[k]["mode"] == "spdk": 1872 init_obj = SPDKInitiator(k, data["general"], v) 1873 elif data[k]["mode"] == "kernel": 1874 init_obj = KernelInitiator(k, data["general"], v) 1875 initiators.append(init_obj) 1876 elif "fio" in k: 1877 fio_workloads = itertools.product(data[k]["bs"], 1878 data[k]["qd"], 1879 data[k]["rw"]) 1880 1881 fio_run_time = data[k]["run_time"] 1882 fio_ramp_time = data[k]["ramp_time"] 1883 fio_rw_mix_read = data[k]["rwmixread"] 1884 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1885 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1886 1887 fio_rate_iops = 0 1888 if "rate_iops" in data[k]: 1889 fio_rate_iops = data[k]["rate_iops"] 1890 1891 fio_offset = False 1892 if "offset" in data[k]: 1893 fio_offset = data[k]["offset"] 1894 fio_offset_inc = 0 1895 if "offset_inc" in data[k]: 1896 fio_offset_inc = data[k]["offset_inc"] 1897 fio_numa_align = True 1898 if "numa_align" in data[k]: 1899 fio_numa_align = data[k]["numa_align"] 1900 else: 1901 continue 1902 1903 try: 1904 os.mkdir(args.results) 1905 except FileExistsError: 1906 pass 1907 1908 for i in initiators: 1909 target_obj.initiator_info.append( 1910 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1911 ) 1912 1913 try: 1914 target_obj.tgt_start() 1915 initiators_match_subsystems(initiators, target_obj) 1916 1917 # Poor mans threading 1918 # Run FIO tests 1919 run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read, 1920 fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops, 1921 fio_offset, fio_offset_inc, fio_numa_align) 1922 1923 except Exception as e: 1924 logging.error("Exception occurred while running FIO tests") 1925 logging.error(e) 1926 exit_code = 1 1927 1928 finally: 1929 for i in [*initiators, target_obj]: 1930 try: 1931 i.stop() 1932 except CpuThrottlingError as err: 1933 logging.error(err) 1934 exit_code = 1 1935 except Exception as err: 1936 pass 1937 sys.exit(exit_code) 1938