1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25from abc import abstractmethod, ABC 26 27sys.path.append(os.path.dirname(__file__) + '/../../../python') 28 29import spdk.rpc as rpc # noqa 30import spdk.rpc.client as rpc_client # noqa 31 32 33class Server(ABC): 34 def __init__(self, name, general_config, server_config): 35 self.name = name 36 self.username = general_config["username"] 37 self.password = general_config["password"] 38 self.transport = general_config["transport"].lower() 39 self.skip_spdk_install = general_config.get('skip_spdk_install', False) 40 self.nic_ips = server_config["nic_ips"] 41 self.mode = server_config["mode"] 42 self.irdma_roce_enable = False 43 44 self.log = logging.getLogger(self.name) 45 46 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 47 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 48 self.irq_scripts_dir = server_config["irq_scripts_dir"] 49 50 self.local_nic_info = [] 51 self._nics_json_obj = {} 52 self.svc_restore_dict = {} 53 self.sysctl_restore_dict = {} 54 self.tuned_restore_dict = {} 55 self.governor_restore = "" 56 self.tuned_profile = "" 57 58 self.enable_adq = False 59 self.adq_priority = None 60 self.enable_arfs = server_config.get("enable_arfs", False) 61 self.irq_settings = {"mode": "default"} 62 63 if "adq_enable" in server_config and server_config["adq_enable"]: 64 self.enable_adq = server_config["adq_enable"] 65 self.adq_priority = 1 66 if "irq_settings" in server_config: 67 self.irq_settings.update(server_config["irq_settings"]) 68 if "tuned_profile" in server_config: 69 self.tuned_profile = server_config["tuned_profile"] 70 if "irdma_roce_enable" in general_config: 71 self.irdma_roce_enable = general_config["irdma_roce_enable"] 72 73 if not re.match(r'^[A-Za-z0-9\-]+$', name): 74 self.log.info("Please use a name which contains only letters, numbers or dashes") 75 sys.exit(1) 76 77 @staticmethod 78 def get_uncommented_lines(lines): 79 return [line for line in lines if line and not line.startswith('#')] 80 81 def get_nic_name_by_ip(self, ip): 82 if not self._nics_json_obj: 83 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 84 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 85 for nic in self._nics_json_obj: 86 for addr in nic["addr_info"]: 87 if ip in addr["local"]: 88 return nic["ifname"] 89 90 @abstractmethod 91 def set_local_nic_info_helper(self): 92 pass 93 94 def set_local_nic_info(self, pci_info): 95 def extract_network_elements(json_obj): 96 nic_list = [] 97 if isinstance(json_obj, list): 98 for x in json_obj: 99 nic_list.extend(extract_network_elements(x)) 100 elif isinstance(json_obj, dict): 101 if "children" in json_obj: 102 nic_list.extend(extract_network_elements(json_obj["children"])) 103 if "class" in json_obj.keys() and "network" in json_obj["class"]: 104 nic_list.append(json_obj) 105 return nic_list 106 107 self.local_nic_info = extract_network_elements(pci_info) 108 109 def get_nic_numa_node(self, nic_name): 110 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 111 112 def get_numa_cpu_map(self): 113 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 114 numa_cpu_json_map = {} 115 116 for cpu in numa_cpu_json_obj["cpus"]: 117 cpu_num = int(cpu["cpu"]) 118 numa_node = int(cpu["node"]) 119 numa_cpu_json_map.setdefault(numa_node, []) 120 numa_cpu_json_map[numa_node].append(cpu_num) 121 122 return numa_cpu_json_map 123 124 # pylint: disable=R0201 125 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 126 return "" 127 128 def configure_system(self): 129 self.load_drivers() 130 self.configure_services() 131 self.configure_sysctl() 132 self.configure_arfs() 133 self.configure_tuned() 134 self.configure_cpu_governor() 135 self.configure_irq_affinity(**self.irq_settings) 136 137 RDMA_PROTOCOL_IWARP = 0 138 RDMA_PROTOCOL_ROCE = 1 139 RDMA_PROTOCOL_UNKNOWN = -1 140 141 def check_rdma_protocol(self): 142 try: 143 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 144 roce_ena = roce_ena.strip() 145 if roce_ena == "0": 146 return self.RDMA_PROTOCOL_IWARP 147 else: 148 return self.RDMA_PROTOCOL_ROCE 149 except CalledProcessError as e: 150 self.log.error("ERROR: failed to check RDMA protocol!") 151 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 152 return self.RDMA_PROTOCOL_UNKNOWN 153 154 def load_drivers(self): 155 self.log.info("Loading drivers") 156 self.exec_cmd(["sudo", "modprobe", "-a", 157 "nvme-%s" % self.transport, 158 "nvmet-%s" % self.transport]) 159 current_mode = self.check_rdma_protocol() 160 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 161 self.log.error("ERROR: failed to check RDMA protocol mode") 162 return 163 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 164 self.reload_driver("irdma", "roce_ena=1") 165 self.log.info("Loaded irdma driver with RoCE enabled") 166 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 167 self.log.info("Leaving irdma driver with RoCE enabled") 168 else: 169 self.reload_driver("irdma", "roce_ena=0") 170 self.log.info("Loaded irdma driver with iWARP enabled") 171 172 def configure_arfs(self): 173 rps_flow_cnt = 512 174 if not self.enable_arfs: 175 rps_flow_cnt = 0 176 177 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 178 for nic_name in nic_names: 179 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 180 self.log.info(f"Setting rps_flow_cnt for {nic_name}") 181 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 182 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 183 184 for qf in queue_files: 185 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 186 187 def configure_adq(self): 188 self.adq_load_modules() 189 self.adq_configure_nic() 190 191 def adq_load_modules(self): 192 self.log.info("Modprobing ADQ-related Linux modules...") 193 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 194 for module in adq_module_deps: 195 try: 196 self.exec_cmd(["sudo", "modprobe", module]) 197 self.log.info("%s loaded!" % module) 198 except CalledProcessError as e: 199 self.log.error("ERROR: failed to load module %s" % module) 200 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 201 202 def adq_configure_tc(self): 203 self.log.info("Configuring ADQ Traffic classes and filters...") 204 205 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 206 num_queues_tc1 = self.num_cores 207 port_param = "dst_port" if isinstance(self, Target) else "src_port" 208 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 209 210 for nic_ip in self.nic_ips: 211 nic_name = self.get_nic_name_by_ip(nic_ip) 212 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 213 214 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 215 "root", "mqprio", "num_tc", "2", "map", "0", "1", 216 "queues", "%s@0" % num_queues_tc0, 217 "%s@%s" % (num_queues_tc1, num_queues_tc0), 218 "hw", "1", "mode", "channel"] 219 self.log.info(" ".join(tc_qdisc_map_cmd)) 220 self.exec_cmd(tc_qdisc_map_cmd) 221 222 time.sleep(5) 223 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 224 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 225 self.exec_cmd(tc_qdisc_ingress_cmd) 226 227 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 228 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 229 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 230 231 for port in nic_ports: 232 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 233 "protocol", "ip", "ingress", "prio", "1", "flower", 234 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 235 "skip_sw", "hw_tc", "1"] 236 self.log.info(" ".join(tc_filter_cmd)) 237 self.exec_cmd(tc_filter_cmd) 238 239 # show tc configuration 240 self.log.info("Show tc configuration for %s NIC..." % nic_name) 241 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 242 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 243 self.log.info("%s" % tc_disk_out) 244 self.log.info("%s" % tc_filter_out) 245 246 # Ethtool coalesce settings must be applied after configuring traffic classes 247 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 248 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 249 250 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 251 xps_cmd = ["sudo", xps_script_path, nic_name] 252 self.log.info(xps_cmd) 253 self.exec_cmd(xps_cmd) 254 255 def reload_driver(self, driver, *modprobe_args): 256 257 try: 258 self.exec_cmd(["sudo", "rmmod", driver]) 259 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 260 except CalledProcessError as e: 261 self.log.error("ERROR: failed to reload %s module!" % driver) 262 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 263 264 def adq_configure_nic(self): 265 self.log.info("Configuring NIC port settings for ADQ testing...") 266 267 # Reload the driver first, to make sure any previous settings are re-set. 268 self.reload_driver("ice") 269 270 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 271 for nic in nic_names: 272 self.log.info(nic) 273 try: 274 self.exec_cmd(["sudo", "ethtool", "-K", nic, 275 "hw-tc-offload", "on"]) # Enable hardware TC offload 276 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 277 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 278 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 279 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 280 # but can be used with 4k block sizes as well 281 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 282 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 283 except subprocess.CalledProcessError as e: 284 self.log.error("ERROR: failed to configure NIC port using ethtool!") 285 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 286 self.log.info("Please update your NIC driver and firmware versions and try again.") 287 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 288 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 289 290 def configure_services(self): 291 self.log.info("Configuring active services...") 292 svc_config = configparser.ConfigParser(strict=False) 293 294 # Below list is valid only for RHEL / Fedora systems and might not 295 # contain valid names for other distributions. 296 svc_target_state = { 297 "firewalld": "inactive", 298 "irqbalance": "inactive", 299 "lldpad.service": "inactive", 300 "lldpad.socket": "inactive" 301 } 302 303 for service in svc_target_state: 304 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 305 out = "\n".join(["[%s]" % service, out]) 306 svc_config.read_string(out) 307 308 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 309 continue 310 311 service_state = svc_config[service]["ActiveState"] 312 self.log.info("Current state of %s service is %s" % (service, service_state)) 313 self.svc_restore_dict.update({service: service_state}) 314 if service_state != "inactive": 315 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 316 self.exec_cmd(["sudo", "systemctl", "stop", service]) 317 318 def configure_sysctl(self): 319 self.log.info("Tuning sysctl settings...") 320 321 busy_read = 0 322 busy_poll = 0 323 if self.enable_adq and self.mode == "spdk": 324 busy_read = 1 325 busy_poll = 1 326 327 sysctl_opts = { 328 "net.core.busy_poll": busy_poll, 329 "net.core.busy_read": busy_read, 330 "net.core.somaxconn": 4096, 331 "net.core.netdev_max_backlog": 8192, 332 "net.ipv4.tcp_max_syn_backlog": 16384, 333 "net.core.rmem_max": 268435456, 334 "net.core.wmem_max": 268435456, 335 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 336 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 337 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 338 "net.ipv4.route.flush": 1, 339 "vm.overcommit_memory": 1, 340 "net.core.rps_sock_flow_entries": 0 341 } 342 343 if self.enable_adq: 344 sysctl_opts.update(self.adq_set_busy_read(1)) 345 346 if self.enable_arfs: 347 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 348 349 for opt, value in sysctl_opts.items(): 350 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 351 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 352 353 def configure_tuned(self): 354 if not self.tuned_profile: 355 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 356 return 357 358 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 359 service = "tuned" 360 tuned_config = configparser.ConfigParser(strict=False) 361 362 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 363 out = "\n".join(["[%s]" % service, out]) 364 tuned_config.read_string(out) 365 tuned_state = tuned_config[service]["ActiveState"] 366 self.svc_restore_dict.update({service: tuned_state}) 367 368 if tuned_state != "inactive": 369 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 370 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 371 372 self.tuned_restore_dict = { 373 "profile": profile, 374 "mode": profile_mode 375 } 376 377 self.exec_cmd(["sudo", "systemctl", "start", service]) 378 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 379 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 380 381 def configure_cpu_governor(self): 382 self.log.info("Setting CPU governor to performance...") 383 384 # This assumes that there is the same CPU scaling governor on each CPU 385 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 386 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 387 388 def get_core_list_from_mask(self, core_mask): 389 # Generate list of individual cores from hex core mask 390 # (e.g. '0xffff') or list containing: 391 # - individual cores (e.g. '1, 2, 3') 392 # - core ranges (e.g. '0-3') 393 # - mix of both (e.g. '0, 1-4, 9, 11-13') 394 395 core_list = [] 396 if "0x" in core_mask: 397 core_mask_int = int(core_mask, 16) 398 for i in range(core_mask_int.bit_length()): 399 if (1 << i) & core_mask_int: 400 core_list.append(i) 401 return core_list 402 else: 403 # Core list can be provided in .json config with square brackets 404 # remove them first 405 core_mask = core_mask.replace("[", "") 406 core_mask = core_mask.replace("]", "") 407 408 for i in core_mask.split(","): 409 if "-" in i: 410 start, end = i.split("-") 411 core_range = range(int(start), int(end) + 1) 412 core_list.extend(core_range) 413 else: 414 core_list.append(int(i)) 415 return core_list 416 417 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 418 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 419 420 if mode not in ["default", "bynode", "cpulist"]: 421 raise ValueError("%s irq affinity setting not supported" % mode) 422 423 if mode == "cpulist" and not cpulist: 424 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 425 426 affinity_script = "set_irq_affinity.sh" 427 if "default" not in mode: 428 affinity_script = "set_irq_affinity_cpulist.sh" 429 system_cpu_map = self.get_numa_cpu_map() 430 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 431 432 def cpu_list_to_string(cpulist): 433 return ",".join(map(lambda x: str(x), cpulist)) 434 435 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 436 for nic_name in nic_names: 437 irq_cmd = ["sudo", irq_script_path] 438 439 # Use only CPU cores matching NIC NUMA node. 440 # Remove any CPU cores if they're on exclusion list. 441 if mode == "bynode": 442 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 443 if cpulist and exclude_cpulist: 444 disallowed_cpus = self.get_core_list_from_mask(cpulist) 445 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 446 if not irq_cpus: 447 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 448 irq_cmd.append(cpu_list_to_string(irq_cpus)) 449 450 if mode == "cpulist": 451 irq_cpus = self.get_core_list_from_mask(cpulist) 452 if exclude_cpulist: 453 # Flatten system CPU list, we don't need NUMA awareness here 454 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 455 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 456 if not irq_cpus: 457 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 458 irq_cmd.append(cpu_list_to_string(irq_cpus)) 459 460 irq_cmd.append(nic_name) 461 self.log.info(irq_cmd) 462 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 463 464 def restore_services(self): 465 self.log.info("Restoring services...") 466 for service, state in self.svc_restore_dict.items(): 467 cmd = "stop" if state == "inactive" else "start" 468 self.exec_cmd(["sudo", "systemctl", cmd, service]) 469 470 def restore_sysctl(self): 471 self.log.info("Restoring sysctl settings...") 472 for opt, value in self.sysctl_restore_dict.items(): 473 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 474 475 def restore_tuned(self): 476 self.log.info("Restoring tuned-adm settings...") 477 478 if not self.tuned_restore_dict: 479 return 480 481 if self.tuned_restore_dict["mode"] == "auto": 482 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 483 self.log.info("Reverted tuned-adm to auto_profile.") 484 else: 485 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 486 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 487 488 def restore_governor(self): 489 self.log.info("Restoring CPU governor setting...") 490 if self.governor_restore: 491 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 492 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 493 494 def restore_settings(self): 495 self.restore_governor() 496 self.restore_tuned() 497 self.restore_services() 498 self.restore_sysctl() 499 if self.enable_adq: 500 self.reload_driver("ice") 501 502 503class Target(Server): 504 def __init__(self, name, general_config, target_config): 505 super().__init__(name, general_config, target_config) 506 507 # Defaults 508 self.enable_zcopy = False 509 self.scheduler_name = "static" 510 self.null_block = 0 511 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 512 self.subsystem_info_list = [] 513 self.initiator_info = [] 514 self.nvme_allowlist = [] 515 self.nvme_blocklist = [] 516 517 # Target-side measurement options 518 self.enable_pm = True 519 self.enable_sar = True 520 self.enable_pcm = True 521 self.enable_bw = True 522 self.enable_dpdk_memory = True 523 524 if "null_block_devices" in target_config: 525 self.null_block = target_config["null_block_devices"] 526 if "scheduler_settings" in target_config: 527 self.scheduler_name = target_config["scheduler_settings"] 528 if "zcopy_settings" in target_config: 529 self.enable_zcopy = target_config["zcopy_settings"] 530 if "results_dir" in target_config: 531 self.results_dir = target_config["results_dir"] 532 if "blocklist" in target_config: 533 self.nvme_blocklist = target_config["blocklist"] 534 if "allowlist" in target_config: 535 self.nvme_allowlist = target_config["allowlist"] 536 # Blocklist takes precedence, remove common elements from allowlist 537 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 538 if "enable_pm" in target_config: 539 self.enable_pm = target_config["enable_pm"] 540 if "enable_sar" in target_config: 541 self.enable_sar = target_config["enable_sar"] 542 if "enable_pcm" in target_config: 543 self.enable_pcm = target_config["enable_pcm"] 544 if "enable_bandwidth" in target_config: 545 self.enable_bw = target_config["enable_bandwidth"] 546 if "enable_dpdk_memory" in target_config: 547 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 548 549 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 550 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 551 552 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 553 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 554 self.set_local_nic_info(self.set_local_nic_info_helper()) 555 556 if self.skip_spdk_install is False: 557 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 558 559 self.configure_system() 560 if self.enable_adq: 561 self.configure_adq() 562 self.configure_irq_affinity() 563 self.sys_config() 564 565 def set_local_nic_info_helper(self): 566 return json.loads(self.exec_cmd(["lshw", "-json"])) 567 568 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 569 stderr_opt = None 570 if stderr_redirect: 571 stderr_opt = subprocess.STDOUT 572 if change_dir: 573 old_cwd = os.getcwd() 574 os.chdir(change_dir) 575 self.log.info("Changing directory to %s" % change_dir) 576 577 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 578 579 if change_dir: 580 os.chdir(old_cwd) 581 self.log.info("Changing directory to %s" % old_cwd) 582 return out 583 584 def zip_spdk_sources(self, spdk_dir, dest_file): 585 self.log.info("Zipping SPDK source directory") 586 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 587 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 588 for file in files: 589 fh.write(os.path.relpath(os.path.join(root, file))) 590 fh.close() 591 self.log.info("Done zipping") 592 593 @staticmethod 594 def _chunks(input_list, chunks_no): 595 div, rem = divmod(len(input_list), chunks_no) 596 for i in range(chunks_no): 597 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 598 yield input_list[si:si + (div + 1 if i < rem else div)] 599 600 def spread_bdevs(self, req_disks): 601 # Spread available block devices indexes: 602 # - evenly across available initiator systems 603 # - evenly across available NIC interfaces for 604 # each initiator 605 # Not NUMA aware. 606 ip_bdev_map = [] 607 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 608 609 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 610 self.initiator_info[i]["bdev_range"] = init_chunk 611 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 612 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 613 for c in nic_chunk: 614 ip_bdev_map.append((ip, c)) 615 return ip_bdev_map 616 617 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 618 cpu_number = os.cpu_count() 619 sar_idle_sum = 0 620 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 621 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 622 623 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 624 time.sleep(ramp_time) 625 self.log.info("Starting SAR measurements") 626 627 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 628 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 629 for line in out.split("\n"): 630 if "Average" in line: 631 if "CPU" in line: 632 self.log.info("Summary CPU utilization from SAR:") 633 self.log.info(line) 634 elif "all" in line: 635 self.log.info(line) 636 else: 637 sar_idle_sum += float(line.split()[7]) 638 fh.write(out) 639 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 640 641 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 642 f.write("%0.2f" % sar_cpu_usage) 643 644 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 645 time.sleep(ramp_time) 646 self.log.info("Starting power measurements") 647 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 648 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 649 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 650 651 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 652 time.sleep(ramp_time) 653 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 654 pcm_memory = subprocess.Popen(cmd) 655 time.sleep(run_time) 656 pcm_memory.terminate() 657 658 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 659 time.sleep(ramp_time) 660 cmd = ["pcm", "1", "-i=%s" % run_time, 661 "-csv=%s/%s" % (results_dir, pcm_file_name)] 662 subprocess.run(cmd) 663 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 664 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 665 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 666 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 667 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 668 669 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 670 time.sleep(ramp_time) 671 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 672 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 673 fh.write(out) 674 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 675 # Improve this so that additional file containing measurements average is generated too. 676 677 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 678 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 679 time.sleep(ramp_time) 680 self.log.info("INFO: starting network bandwidth measure") 681 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 682 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 683 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 684 # Improve this so that additional file containing measurements average is generated too. 685 # TODO: Monitor only these interfaces which are currently used to run the workload. 686 687 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 688 self.log.info("INFO: waiting to generate DPDK memory usage") 689 time.sleep(ramp_time) 690 self.log.info("INFO: generating DPDK memory usage") 691 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 692 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 693 694 def sys_config(self): 695 self.log.info("====Kernel release:====") 696 self.log.info(os.uname().release) 697 self.log.info("====Kernel command line:====") 698 with open('/proc/cmdline') as f: 699 cmdline = f.readlines() 700 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 701 self.log.info("====sysctl conf:====") 702 with open('/etc/sysctl.conf') as f: 703 sysctl = f.readlines() 704 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 705 self.log.info("====Cpu power info:====") 706 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 707 self.log.info("====zcopy settings:====") 708 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 709 self.log.info("====Scheduler settings:====") 710 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 711 712 713class Initiator(Server): 714 def __init__(self, name, general_config, initiator_config): 715 super().__init__(name, general_config, initiator_config) 716 717 # Required fields 718 self.ip = initiator_config["ip"] 719 self.target_nic_ips = initiator_config["target_nic_ips"] 720 721 # Defaults 722 self.cpus_allowed = None 723 self.cpus_allowed_policy = "shared" 724 self.spdk_dir = "/tmp/spdk" 725 self.fio_bin = "/usr/src/fio/fio" 726 self.nvmecli_bin = "nvme" 727 self.cpu_frequency = None 728 self.subsystem_info_list = [] 729 self.allow_cpu_sharing = True 730 731 if "spdk_dir" in initiator_config: 732 self.spdk_dir = initiator_config["spdk_dir"] 733 if "fio_bin" in initiator_config: 734 self.fio_bin = initiator_config["fio_bin"] 735 if "nvmecli_bin" in initiator_config: 736 self.nvmecli_bin = initiator_config["nvmecli_bin"] 737 if "cpus_allowed" in initiator_config: 738 self.cpus_allowed = initiator_config["cpus_allowed"] 739 if "cpus_allowed_policy" in initiator_config: 740 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 741 if "cpu_frequency" in initiator_config: 742 self.cpu_frequency = initiator_config["cpu_frequency"] 743 if "allow_cpu_sharing" in initiator_config: 744 self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"] 745 if os.getenv('SPDK_WORKSPACE'): 746 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 747 748 self.ssh_connection = paramiko.SSHClient() 749 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 750 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 751 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 752 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 753 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 754 755 if self.skip_spdk_install is False: 756 self.copy_spdk("/tmp/spdk.zip") 757 758 self.set_local_nic_info(self.set_local_nic_info_helper()) 759 self.set_cpu_frequency() 760 self.configure_system() 761 if self.enable_adq: 762 self.configure_adq() 763 self.sys_config() 764 765 def set_local_nic_info_helper(self): 766 return json.loads(self.exec_cmd(["lshw", "-json"])) 767 768 def stop(self): 769 self.restore_settings() 770 self.ssh_connection.close() 771 772 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 773 if change_dir: 774 cmd = ["cd", change_dir, ";", *cmd] 775 776 # In case one of the command elements contains whitespace and is not 777 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 778 # when sending to remote system. 779 for i, c in enumerate(cmd): 780 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 781 cmd[i] = '"%s"' % c 782 cmd = " ".join(cmd) 783 784 # Redirect stderr to stdout thanks using get_pty option if needed 785 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 786 out = stdout.read().decode(encoding="utf-8") 787 788 # Check the return code 789 rc = stdout.channel.recv_exit_status() 790 if rc: 791 raise CalledProcessError(int(rc), cmd, out) 792 793 return out 794 795 def put_file(self, local, remote_dest): 796 ftp = self.ssh_connection.open_sftp() 797 ftp.put(local, remote_dest) 798 ftp.close() 799 800 def get_file(self, remote, local_dest): 801 ftp = self.ssh_connection.open_sftp() 802 ftp.get(remote, local_dest) 803 ftp.close() 804 805 def copy_spdk(self, local_spdk_zip): 806 self.log.info("Copying SPDK sources to initiator %s" % self.name) 807 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 808 self.log.info("Copied sources zip from target") 809 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 810 self.log.info("Sources unpacked") 811 812 def copy_result_files(self, dest_dir): 813 self.log.info("Copying results") 814 815 if not os.path.exists(dest_dir): 816 os.mkdir(dest_dir) 817 818 # Get list of result files from initiator and copy them back to target 819 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 820 821 for file in file_list: 822 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 823 os.path.join(dest_dir, file)) 824 self.log.info("Done copying results") 825 826 def match_subsystems(self, target_subsytems): 827 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 828 subsystems.sort(key=lambda x: x[1]) 829 self.log.info("Found matching subsystems on target side:") 830 for s in subsystems: 831 self.log.info(s) 832 self.subsystem_info_list = subsystems 833 834 @abstractmethod 835 def init_connect(self): 836 pass 837 838 @abstractmethod 839 def init_disconnect(self): 840 pass 841 842 @abstractmethod 843 def gen_fio_filename_conf(self, *args, **kwargs): 844 # Logic implemented in SPDKInitiator and KernelInitiator classes 845 pass 846 847 def get_route_nic_numa(self, remote_nvme_ip): 848 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 849 local_nvme_nic = local_nvme_nic[0]["dev"] 850 return self.get_nic_numa_node(local_nvme_nic) 851 852 @staticmethod 853 def gen_fio_offset_section(offset_inc, num_jobs): 854 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 855 return "\n".join(["size=%s%%" % offset_inc, 856 "offset=0%", 857 "offset_increment=%s%%" % offset_inc]) 858 859 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 860 numa_stats = {} 861 allowed_cpus = [] 862 for nvme in fio_filenames_list: 863 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 864 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 865 866 # Use the most common NUMA node for this chunk to allocate memory and CPUs 867 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 868 869 # Check if we have enough free CPUs to pop from the list before assigning them 870 if len(self.available_cpus[section_local_numa]) < num_jobs: 871 if self.allow_cpu_sharing: 872 self.log.info("Regenerating available CPU list %s" % section_local_numa) 873 # Remove still available CPUs from the regenerated list. We don't want to 874 # regenerate it with duplicates. 875 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 876 self.available_cpus[section_local_numa].extend(cpus_regen) 877 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 878 else: 879 self.log.error("No more free CPU cores to use from allowed_cpus list!") 880 raise IndexError 881 882 for _ in range(num_jobs): 883 try: 884 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 885 except IndexError: 886 self.log.error("No more free CPU cores to use from allowed_cpus list!") 887 raise 888 889 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 890 "numa_mem_policy=prefer:%s" % section_local_numa]) 891 892 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 893 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 894 offset=False, offset_inc=0): 895 fio_conf_template = """ 896[global] 897ioengine={ioengine} 898{spdk_conf} 899thread=1 900group_reporting=1 901direct=1 902percentile_list=50:90:99:99.5:99.9:99.99:99.999 903 904norandommap=1 905rw={rw} 906rwmixread={rwmixread} 907bs={block_size} 908time_based=1 909ramp_time={ramp_time} 910runtime={run_time} 911rate_iops={rate_iops} 912""" 913 914 if self.cpus_allowed is not None: 915 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 916 cpus_num = 0 917 cpus = self.cpus_allowed.split(",") 918 for cpu in cpus: 919 if "-" in cpu: 920 a, b = cpu.split("-") 921 a = int(a) 922 b = int(b) 923 cpus_num += len(range(a, b)) 924 else: 925 cpus_num += 1 926 self.num_cores = cpus_num 927 threads = range(0, self.num_cores) 928 elif hasattr(self, 'num_cores'): 929 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 930 threads = range(0, int(self.num_cores)) 931 else: 932 self.num_cores = len(self.subsystem_info_list) 933 threads = range(0, len(self.subsystem_info_list)) 934 935 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 936 offset, offset_inc) 937 938 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 939 rw=rw, rwmixread=rwmixread, block_size=block_size, 940 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 941 942 # TODO: hipri disabled for now, as it causes fio errors: 943 # io_u error on file /dev/nvme2n1: Operation not supported 944 # See comment in KernelInitiator class, init_connect() function 945 if "io_uring" in self.ioengine: 946 fio_config = fio_config + """ 947fixedbufs=1 948registerfiles=1 949#hipri=1 950""" 951 if num_jobs: 952 fio_config = fio_config + "numjobs=%s \n" % num_jobs 953 if self.cpus_allowed is not None: 954 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 955 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 956 fio_config = fio_config + filename_section 957 958 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 959 if hasattr(self, "num_cores"): 960 fio_config_filename += "_%sCPU" % self.num_cores 961 fio_config_filename += ".fio" 962 963 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 964 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 965 self.log.info("Created FIO Config:") 966 self.log.info(fio_config) 967 968 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 969 970 def set_cpu_frequency(self): 971 if self.cpu_frequency is not None: 972 try: 973 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 974 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 975 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 976 except Exception: 977 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 978 sys.exit() 979 else: 980 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 981 982 def run_fio(self, fio_config_file, run_num=1): 983 job_name, _ = os.path.splitext(fio_config_file) 984 self.log.info("Starting FIO run for job: %s" % job_name) 985 self.log.info("Using FIO: %s" % self.fio_bin) 986 987 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 988 try: 989 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 990 "--output=%s" % output_filename, "--eta=never"], True) 991 self.log.info(output) 992 self.log.info("FIO run finished. Results in: %s" % output_filename) 993 except subprocess.CalledProcessError as e: 994 self.log.error("ERROR: Fio process failed!") 995 self.log.error(e.stdout) 996 997 def sys_config(self): 998 self.log.info("====Kernel release:====") 999 self.log.info(self.exec_cmd(["uname", "-r"])) 1000 self.log.info("====Kernel command line:====") 1001 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1002 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1003 self.log.info("====sysctl conf:====") 1004 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 1005 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1006 self.log.info("====Cpu power info:====") 1007 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 1008 1009 1010class KernelTarget(Target): 1011 def __init__(self, name, general_config, target_config): 1012 super().__init__(name, general_config, target_config) 1013 # Defaults 1014 self.nvmet_bin = "nvmetcli" 1015 1016 if "nvmet_bin" in target_config: 1017 self.nvmet_bin = target_config["nvmet_bin"] 1018 1019 def load_drivers(self): 1020 self.log.info("Loading drivers") 1021 super().load_drivers() 1022 if self.null_block: 1023 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1024 1025 def configure_adq(self): 1026 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1027 1028 def adq_configure_tc(self): 1029 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1030 1031 def adq_set_busy_read(self, busy_read_val): 1032 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1033 return {"net.core.busy_read": 0} 1034 1035 def stop(self): 1036 self.nvmet_command(self.nvmet_bin, "clear") 1037 self.restore_settings() 1038 1039 def get_nvme_device_bdf(self, nvme_dev_path): 1040 nvme_name = os.path.basename(nvme_dev_path) 1041 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1042 1043 def get_nvme_devices(self): 1044 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1045 nvme_list = [] 1046 for dev in dev_list: 1047 if "nvme" not in dev: 1048 continue 1049 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1050 continue 1051 if len(self.nvme_allowlist) == 0: 1052 nvme_list.append(dev) 1053 continue 1054 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1055 nvme_list.append(dev) 1056 return nvme_list 1057 1058 def nvmet_command(self, nvmet_bin, command): 1059 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1060 1061 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1062 1063 nvmet_cfg = { 1064 "ports": [], 1065 "hosts": [], 1066 "subsystems": [], 1067 } 1068 1069 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1070 port = str(4420 + bdev_num) 1071 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1072 serial = "SPDK00%s" % bdev_num 1073 bdev_name = nvme_list[bdev_num] 1074 1075 nvmet_cfg["subsystems"].append({ 1076 "allowed_hosts": [], 1077 "attr": { 1078 "allow_any_host": "1", 1079 "serial": serial, 1080 "version": "1.3" 1081 }, 1082 "namespaces": [ 1083 { 1084 "device": { 1085 "path": bdev_name, 1086 "uuid": "%s" % uuid.uuid4() 1087 }, 1088 "enable": 1, 1089 "nsid": port 1090 } 1091 ], 1092 "nqn": nqn 1093 }) 1094 1095 nvmet_cfg["ports"].append({ 1096 "addr": { 1097 "adrfam": "ipv4", 1098 "traddr": ip, 1099 "trsvcid": port, 1100 "trtype": self.transport 1101 }, 1102 "portid": bdev_num, 1103 "referrals": [], 1104 "subsystems": [nqn] 1105 }) 1106 1107 self.subsystem_info_list.append((port, nqn, ip)) 1108 self.subsys_no = len(self.subsystem_info_list) 1109 1110 with open("kernel.conf", "w") as fh: 1111 fh.write(json.dumps(nvmet_cfg, indent=2)) 1112 1113 def tgt_start(self): 1114 self.log.info("Configuring kernel NVMeOF Target") 1115 1116 if self.null_block: 1117 self.log.info("Configuring with null block device.") 1118 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1119 else: 1120 self.log.info("Configuring with NVMe drives.") 1121 nvme_list = self.get_nvme_devices() 1122 1123 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1124 self.subsys_no = len(nvme_list) 1125 1126 self.nvmet_command(self.nvmet_bin, "clear") 1127 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1128 1129 if self.enable_adq: 1130 self.adq_configure_tc() 1131 1132 self.log.info("Done configuring kernel NVMeOF Target") 1133 1134 1135class SPDKTarget(Target): 1136 def __init__(self, name, general_config, target_config): 1137 # Required fields 1138 self.core_mask = target_config["core_mask"] 1139 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1140 1141 super().__init__(name, general_config, target_config) 1142 1143 # Defaults 1144 self.dif_insert_strip = False 1145 self.null_block_dif_type = 0 1146 self.num_shared_buffers = 4096 1147 self.max_queue_depth = 128 1148 self.bpf_proc = None 1149 self.bpf_scripts = [] 1150 self.enable_dsa = False 1151 self.scheduler_core_limit = None 1152 self.iobuf_small_pool_count = 32767 1153 self.iobuf_large_pool_count = 16383 1154 self.num_cqe = 4096 1155 1156 if "num_shared_buffers" in target_config: 1157 self.num_shared_buffers = target_config["num_shared_buffers"] 1158 if "max_queue_depth" in target_config: 1159 self.max_queue_depth = target_config["max_queue_depth"] 1160 if "null_block_dif_type" in target_config: 1161 self.null_block_dif_type = target_config["null_block_dif_type"] 1162 if "dif_insert_strip" in target_config: 1163 self.dif_insert_strip = target_config["dif_insert_strip"] 1164 if "bpf_scripts" in target_config: 1165 self.bpf_scripts = target_config["bpf_scripts"] 1166 if "dsa_settings" in target_config: 1167 self.enable_dsa = target_config["dsa_settings"] 1168 if "scheduler_core_limit" in target_config: 1169 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1170 if "iobuf_small_pool_count" in target_config: 1171 self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"] 1172 if "iobuf_large_pool_count" in target_config: 1173 self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"] 1174 if "num_cqe" in target_config: 1175 self.num_cqe = target_config["num_cqe"] 1176 1177 self.log.info("====DSA settings:====") 1178 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1179 1180 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1181 if mode not in ["default", "bynode", "cpulist", 1182 "shared", "split", "split-bynode"]: 1183 self.log.error("%s irq affinity setting not supported" % mode) 1184 raise Exception 1185 1186 # Create core list from SPDK's mask and change it to string. 1187 # This is the type configure_irq_affinity expects for cpulist parameter. 1188 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1189 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1190 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1191 1192 if mode == "shared": 1193 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1194 elif mode == "split": 1195 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1196 elif mode == "split-bynode": 1197 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1198 else: 1199 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1200 1201 def adq_set_busy_read(self, busy_read_val): 1202 return {"net.core.busy_read": busy_read_val} 1203 1204 def get_nvme_devices_count(self): 1205 return len(self.get_nvme_devices()) 1206 1207 def get_nvme_devices(self): 1208 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1209 bdev_bdfs = [] 1210 for bdev in bdev_subsys_json_obj["config"]: 1211 bdev_traddr = bdev["params"]["traddr"] 1212 if bdev_traddr in self.nvme_blocklist: 1213 continue 1214 if len(self.nvme_allowlist) == 0: 1215 bdev_bdfs.append(bdev_traddr) 1216 if bdev_traddr in self.nvme_allowlist: 1217 bdev_bdfs.append(bdev_traddr) 1218 return bdev_bdfs 1219 1220 def spdk_tgt_configure(self): 1221 self.log.info("Configuring SPDK NVMeOF target via RPC") 1222 1223 # Create transport layer 1224 nvmf_transport_params = { 1225 "client": self.client, 1226 "trtype": self.transport, 1227 "num_shared_buffers": self.num_shared_buffers, 1228 "max_queue_depth": self.max_queue_depth, 1229 "dif_insert_or_strip": self.dif_insert_strip, 1230 "sock_priority": self.adq_priority, 1231 "num_cqe": self.num_cqe 1232 } 1233 1234 if self.enable_adq: 1235 nvmf_transport_params["acceptor_poll_rate"] = 10000 1236 1237 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1238 self.log.info("SPDK NVMeOF transport layer:") 1239 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1240 1241 if self.null_block: 1242 self.spdk_tgt_add_nullblock(self.null_block) 1243 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1244 else: 1245 self.spdk_tgt_add_nvme_conf() 1246 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1247 1248 if self.enable_adq: 1249 self.adq_configure_tc() 1250 1251 self.log.info("Done configuring SPDK NVMeOF Target") 1252 1253 def spdk_tgt_add_nullblock(self, null_block_count): 1254 md_size = 0 1255 block_size = 4096 1256 if self.null_block_dif_type != 0: 1257 md_size = 128 1258 1259 self.log.info("Adding null block bdevices to config via RPC") 1260 for i in range(null_block_count): 1261 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1262 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1263 dif_type=self.null_block_dif_type, md_size=md_size) 1264 self.log.info("SPDK Bdevs configuration:") 1265 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1266 1267 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1268 self.log.info("Adding NVMe bdevs to config via RPC") 1269 1270 bdfs = self.get_nvme_devices() 1271 bdfs = [b.replace(":", ".") for b in bdfs] 1272 1273 if req_num_disks: 1274 if req_num_disks > len(bdfs): 1275 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1276 sys.exit(1) 1277 else: 1278 bdfs = bdfs[0:req_num_disks] 1279 1280 for i, bdf in enumerate(bdfs): 1281 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1282 1283 self.log.info("SPDK Bdevs configuration:") 1284 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1285 1286 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1287 self.log.info("Adding subsystems to config") 1288 if not req_num_disks: 1289 req_num_disks = self.get_nvme_devices_count() 1290 1291 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1292 port = str(4420 + bdev_num) 1293 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1294 serial = "SPDK00%s" % bdev_num 1295 bdev_name = "Nvme%sn1" % bdev_num 1296 1297 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1298 allow_any_host=True, max_namespaces=8) 1299 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1300 for nqn_name in [nqn, "discovery"]: 1301 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1302 nqn=nqn_name, 1303 trtype=self.transport, 1304 traddr=ip, 1305 trsvcid=port, 1306 adrfam="ipv4") 1307 self.subsystem_info_list.append((port, nqn, ip)) 1308 self.subsys_no = len(self.subsystem_info_list) 1309 1310 self.log.info("SPDK NVMeOF subsystem configuration:") 1311 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1312 1313 def bpf_start(self): 1314 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1315 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1316 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1317 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1318 1319 with open(self.pid, "r") as fh: 1320 nvmf_pid = str(fh.readline()) 1321 1322 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1323 self.log.info(cmd) 1324 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1325 1326 def tgt_start(self): 1327 if self.null_block: 1328 self.subsys_no = 1 1329 else: 1330 self.subsys_no = self.get_nvme_devices_count() 1331 self.log.info("Starting SPDK NVMeOF Target process") 1332 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1333 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1334 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1335 1336 with open(self.pid, "w") as fh: 1337 fh.write(str(proc.pid)) 1338 self.nvmf_proc = proc 1339 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1340 self.log.info("Waiting for spdk to initialize...") 1341 while True: 1342 if os.path.exists("/var/tmp/spdk.sock"): 1343 break 1344 time.sleep(1) 1345 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1346 1347 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1348 rpc.iobuf.iobuf_set_options(self.client, 1349 small_pool_count=self.iobuf_small_pool_count, 1350 large_pool_count=self.iobuf_large_pool_count, 1351 small_bufsize=None, 1352 large_bufsize=None) 1353 1354 if self.enable_zcopy: 1355 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1356 enable_zerocopy_send_server=True) 1357 self.log.info("Target socket options:") 1358 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1359 1360 if self.enable_adq: 1361 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1362 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1363 nvme_adminq_poll_period_us=100000, retry_count=4) 1364 1365 if self.enable_dsa: 1366 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1367 self.log.info("Target DSA accel module enabled") 1368 1369 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1370 rpc.framework_start_init(self.client) 1371 1372 if self.bpf_scripts: 1373 self.bpf_start() 1374 1375 self.spdk_tgt_configure() 1376 1377 def stop(self): 1378 if self.bpf_proc: 1379 self.log.info("Stopping BPF Trace script") 1380 self.bpf_proc.terminate() 1381 self.bpf_proc.wait() 1382 1383 if hasattr(self, "nvmf_proc"): 1384 try: 1385 self.nvmf_proc.terminate() 1386 self.nvmf_proc.wait(timeout=30) 1387 except Exception as e: 1388 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1389 self.log.info(e) 1390 self.nvmf_proc.kill() 1391 self.nvmf_proc.communicate() 1392 # Try to clean up RPC socket files if they were not removed 1393 # because of using 'kill' 1394 try: 1395 os.remove("/var/tmp/spdk.sock") 1396 os.remove("/var/tmp/spdk.sock.lock") 1397 except FileNotFoundError: 1398 pass 1399 self.restore_settings() 1400 1401 1402class KernelInitiator(Initiator): 1403 def __init__(self, name, general_config, initiator_config): 1404 super().__init__(name, general_config, initiator_config) 1405 1406 # Defaults 1407 self.extra_params = "" 1408 self.ioengine = "libaio" 1409 self.spdk_conf = "" 1410 1411 if "num_cores" in initiator_config: 1412 self.num_cores = initiator_config["num_cores"] 1413 1414 if "extra_params" in initiator_config: 1415 self.extra_params = initiator_config["extra_params"] 1416 1417 if "kernel_engine" in initiator_config: 1418 self.ioengine = initiator_config["kernel_engine"] 1419 if "io_uring" in self.ioengine: 1420 self.extra_params += ' --nr-poll-queues=8' 1421 1422 def configure_adq(self): 1423 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1424 1425 def adq_configure_tc(self): 1426 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1427 1428 def adq_set_busy_read(self, busy_read_val): 1429 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1430 return {"net.core.busy_read": 0} 1431 1432 def get_connected_nvme_list(self): 1433 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1434 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1435 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1436 return nvme_list 1437 1438 def init_connect(self): 1439 self.log.info("Below connection attempts may result in error messages, this is expected!") 1440 for subsystem in self.subsystem_info_list: 1441 self.log.info("Trying to connect %s %s %s" % subsystem) 1442 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1443 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1444 time.sleep(2) 1445 1446 if "io_uring" in self.ioengine: 1447 self.log.info("Setting block layer settings for io_uring.") 1448 1449 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1450 # apparently it's not possible for connected subsystems. 1451 # Results in "error: Invalid argument" 1452 block_sysfs_settings = { 1453 "iostats": "0", 1454 "rq_affinity": "0", 1455 "nomerges": "2" 1456 } 1457 1458 for disk in self.get_connected_nvme_list(): 1459 sysfs = os.path.join("/sys/block", disk, "queue") 1460 for k, v in block_sysfs_settings.items(): 1461 sysfs_opt_path = os.path.join(sysfs, k) 1462 try: 1463 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1464 except CalledProcessError as e: 1465 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1466 finally: 1467 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1468 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1469 1470 def init_disconnect(self): 1471 for subsystem in self.subsystem_info_list: 1472 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1473 time.sleep(1) 1474 1475 def get_nvme_subsystem_numa(self, dev_name): 1476 # Remove two last characters to get controller name instead of subsystem name 1477 nvme_ctrl = os.path.basename(dev_name)[:-2] 1478 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1479 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1480 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1481 1482 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1483 self.available_cpus = self.get_numa_cpu_map() 1484 if len(threads) >= len(subsystems): 1485 threads = range(0, len(subsystems)) 1486 1487 # Generate connected nvme devices names and sort them by used NIC numa node 1488 # to allow better grouping when splitting into fio sections. 1489 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1490 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1491 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1492 1493 filename_section = "" 1494 nvme_per_split = int(len(nvme_list) / len(threads)) 1495 remainder = len(nvme_list) % len(threads) 1496 iterator = iter(nvme_list) 1497 result = [] 1498 for i in range(len(threads)): 1499 result.append([]) 1500 for _ in range(nvme_per_split): 1501 result[i].append(next(iterator)) 1502 if remainder: 1503 result[i].append(next(iterator)) 1504 remainder -= 1 1505 for i, r in enumerate(result): 1506 header = "[filename%s]" % i 1507 disks = "\n".join(["filename=%s" % x for x in r]) 1508 job_section_qd = round((io_depth * len(r)) / num_jobs) 1509 if job_section_qd == 0: 1510 job_section_qd = 1 1511 iodepth = "iodepth=%s" % job_section_qd 1512 1513 offset_section = "" 1514 if offset: 1515 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1516 1517 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1518 1519 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1520 1521 return filename_section 1522 1523 1524class SPDKInitiator(Initiator): 1525 def __init__(self, name, general_config, initiator_config): 1526 super().__init__(name, general_config, initiator_config) 1527 1528 if self.skip_spdk_install is False: 1529 self.install_spdk() 1530 1531 # Optional fields 1532 self.enable_data_digest = False 1533 if "enable_data_digest" in initiator_config: 1534 self.enable_data_digest = initiator_config["enable_data_digest"] 1535 if "num_cores" in initiator_config: 1536 self.num_cores = initiator_config["num_cores"] 1537 1538 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1539 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1540 1541 def adq_set_busy_read(self, busy_read_val): 1542 return {"net.core.busy_read": busy_read_val} 1543 1544 def install_spdk(self): 1545 self.log.info("Using fio binary %s" % self.fio_bin) 1546 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1547 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1548 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1549 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1550 "--enable-lto", "--disable-unit-tests"]) 1551 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1552 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1553 1554 self.log.info("SPDK built") 1555 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1556 1557 def init_connect(self): 1558 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1559 # bdev plugin initiates connection just before starting IO traffic. 1560 # This is just to have a "init_connect" equivalent of the same function 1561 # from KernelInitiator class. 1562 # Just prepare bdev.conf JSON file for later use and consider it 1563 # "making a connection". 1564 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1565 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1566 1567 def init_disconnect(self): 1568 # SPDK Initiator does not need to explicity disconnect as this gets done 1569 # after fio bdev plugin finishes IO. 1570 return 1571 1572 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1573 bdev_cfg_section = { 1574 "subsystems": [ 1575 { 1576 "subsystem": "bdev", 1577 "config": [] 1578 } 1579 ] 1580 } 1581 1582 for i, subsys in enumerate(remote_subsystem_list): 1583 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1584 nvme_ctrl = { 1585 "method": "bdev_nvme_attach_controller", 1586 "params": { 1587 "name": "Nvme{}".format(i), 1588 "trtype": self.transport, 1589 "traddr": sub_addr, 1590 "trsvcid": sub_port, 1591 "subnqn": sub_nqn, 1592 "adrfam": "IPv4" 1593 } 1594 } 1595 1596 if self.enable_adq: 1597 nvme_ctrl["params"].update({"priority": "1"}) 1598 1599 if self.enable_data_digest: 1600 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1601 1602 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1603 1604 return json.dumps(bdev_cfg_section, indent=2) 1605 1606 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1607 self.available_cpus = self.get_numa_cpu_map() 1608 filename_section = "" 1609 if len(threads) >= len(subsystems): 1610 threads = range(0, len(subsystems)) 1611 1612 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1613 # to allow better grouping when splitting into fio sections. 1614 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1615 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1616 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1617 1618 nvme_per_split = int(len(subsystems) / len(threads)) 1619 remainder = len(subsystems) % len(threads) 1620 iterator = iter(filenames) 1621 result = [] 1622 for i in range(len(threads)): 1623 result.append([]) 1624 for _ in range(nvme_per_split): 1625 result[i].append(next(iterator)) 1626 if remainder: 1627 result[i].append(next(iterator)) 1628 remainder -= 1 1629 for i, r in enumerate(result): 1630 header = "[filename%s]" % i 1631 disks = "\n".join(["filename=%s" % x for x in r]) 1632 job_section_qd = round((io_depth * len(r)) / num_jobs) 1633 if job_section_qd == 0: 1634 job_section_qd = 1 1635 iodepth = "iodepth=%s" % job_section_qd 1636 1637 offset_section = "" 1638 if offset: 1639 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1640 1641 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1642 1643 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1644 1645 return filename_section 1646 1647 def get_nvme_subsystem_numa(self, bdev_name): 1648 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1649 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1650 1651 # Remove two last characters to get controller name instead of subsystem name 1652 nvme_ctrl = bdev_name[:-2] 1653 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1654 return self.get_route_nic_numa(remote_nvme_ip) 1655 1656 1657if __name__ == "__main__": 1658 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1659 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1660 1661 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1662 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1663 help='Configuration file.') 1664 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1665 help='Results directory.') 1666 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1667 help='CSV results filename.') 1668 parser.add_argument('-f', '--force', default=False, action='store_true', 1669 dest='force', help="""Force script to continue and try to use all 1670 available NVMe devices during test. 1671 WARNING: Might result in data loss on used NVMe drives""") 1672 1673 args = parser.parse_args() 1674 1675 logging.basicConfig(level=logging.INFO, 1676 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1677 1678 logging.info("Using config file: %s" % args.config) 1679 with open(args.config, "r") as config: 1680 data = json.load(config) 1681 1682 initiators = [] 1683 fio_cases = [] 1684 1685 general_config = data["general"] 1686 target_config = data["target"] 1687 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1688 1689 if "null_block_devices" not in data["target"] and \ 1690 (args.force is False and 1691 "allowlist" not in data["target"] and 1692 "blocklist" not in data["target"]): 1693 # TODO: Also check if allowlist or blocklist are not empty. 1694 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1695 You can choose to use all available NVMe drives on your system, which may potentially 1696 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1697 exit(1) 1698 1699 for k, v in data.items(): 1700 if "target" in k: 1701 v.update({"results_dir": args.results}) 1702 if data[k]["mode"] == "spdk": 1703 target_obj = SPDKTarget(k, data["general"], v) 1704 elif data[k]["mode"] == "kernel": 1705 target_obj = KernelTarget(k, data["general"], v) 1706 elif "initiator" in k: 1707 if data[k]["mode"] == "spdk": 1708 init_obj = SPDKInitiator(k, data["general"], v) 1709 elif data[k]["mode"] == "kernel": 1710 init_obj = KernelInitiator(k, data["general"], v) 1711 initiators.append(init_obj) 1712 elif "fio" in k: 1713 fio_workloads = itertools.product(data[k]["bs"], 1714 data[k]["qd"], 1715 data[k]["rw"]) 1716 1717 fio_run_time = data[k]["run_time"] 1718 fio_ramp_time = data[k]["ramp_time"] 1719 fio_rw_mix_read = data[k]["rwmixread"] 1720 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1721 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1722 1723 fio_rate_iops = 0 1724 if "rate_iops" in data[k]: 1725 fio_rate_iops = data[k]["rate_iops"] 1726 1727 fio_offset = False 1728 if "offset" in data[k]: 1729 fio_offset = data[k]["offset"] 1730 fio_offset_inc = 0 1731 if "offset_inc" in data[k]: 1732 fio_offset_inc = data[k]["offset_inc"] 1733 else: 1734 continue 1735 1736 try: 1737 os.mkdir(args.results) 1738 except FileExistsError: 1739 pass 1740 1741 for i in initiators: 1742 target_obj.initiator_info.append( 1743 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1744 ) 1745 1746 # TODO: This try block is definietly too large. Need to break this up into separate 1747 # logical blocks to reduce size. 1748 try: 1749 target_obj.tgt_start() 1750 1751 for i in initiators: 1752 i.match_subsystems(target_obj.subsystem_info_list) 1753 if i.enable_adq: 1754 i.adq_configure_tc() 1755 1756 # Poor mans threading 1757 # Run FIO tests 1758 for block_size, io_depth, rw in fio_workloads: 1759 configs = [] 1760 for i in initiators: 1761 i.init_connect() 1762 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1763 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1764 fio_offset, fio_offset_inc) 1765 configs.append(cfg) 1766 1767 for run_no in range(1, fio_run_num+1): 1768 threads = [] 1769 power_daemon = None 1770 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1771 1772 for i, cfg in zip(initiators, configs): 1773 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1774 threads.append(t) 1775 if target_obj.enable_sar: 1776 sar_file_prefix = measurements_prefix + "_sar" 1777 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1778 threads.append(t) 1779 1780 if target_obj.enable_pcm: 1781 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1782 1783 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1784 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1785 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1786 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1787 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1788 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1789 1790 threads.append(pcm_cpu_t) 1791 threads.append(pcm_mem_t) 1792 threads.append(pcm_pow_t) 1793 1794 if target_obj.enable_bw: 1795 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1796 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1797 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1798 threads.append(t) 1799 1800 if target_obj.enable_dpdk_memory: 1801 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1802 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1803 threads.append(t) 1804 1805 if target_obj.enable_pm: 1806 power_daemon = threading.Thread(target=target_obj.measure_power, 1807 args=(args.results, measurements_prefix, script_full_dir, 1808 fio_ramp_time, fio_run_time)) 1809 threads.append(power_daemon) 1810 1811 for t in threads: 1812 t.start() 1813 for t in threads: 1814 t.join() 1815 1816 for i in initiators: 1817 i.init_disconnect() 1818 i.copy_result_files(args.results) 1819 try: 1820 parse_results(args.results, args.csv_filename) 1821 except Exception as err: 1822 logging.error("There was an error with parsing the results") 1823 logging.error(err) 1824 finally: 1825 for i in initiators: 1826 try: 1827 i.stop() 1828 except Exception as err: 1829 pass 1830 target_obj.stop() 1831