1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.skip_spdk_install = general_config.get('skip_spdk_install', False) 39 self.nic_ips = server_config["nic_ips"] 40 self.mode = server_config["mode"] 41 self.irdma_roce_enable = False 42 43 self.log = logging.getLogger(self.name) 44 45 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 46 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 47 self.irq_scripts_dir = server_config["irq_scripts_dir"] 48 49 self.local_nic_info = [] 50 self._nics_json_obj = {} 51 self.svc_restore_dict = {} 52 self.sysctl_restore_dict = {} 53 self.tuned_restore_dict = {} 54 self.governor_restore = "" 55 self.tuned_profile = "" 56 57 self.enable_adq = False 58 self.adq_priority = None 59 self.enable_arfs = server_config.get("enable_arfs", False) 60 self.irq_settings = {"mode": "default"} 61 62 if "adq_enable" in server_config and server_config["adq_enable"]: 63 self.enable_adq = server_config["adq_enable"] 64 self.adq_priority = 1 65 if "irq_settings" in server_config: 66 self.irq_settings.update(server_config["irq_settings"]) 67 if "tuned_profile" in server_config: 68 self.tuned_profile = server_config["tuned_profile"] 69 if "irdma_roce_enable" in general_config: 70 self.irdma_roce_enable = general_config["irdma_roce_enable"] 71 72 if not re.match(r'^[A-Za-z0-9\-]+$', name): 73 self.log.info("Please use a name which contains only letters, numbers or dashes") 74 sys.exit(1) 75 76 @staticmethod 77 def get_uncommented_lines(lines): 78 return [line for line in lines if line and not line.startswith('#')] 79 80 def get_nic_name_by_ip(self, ip): 81 if not self._nics_json_obj: 82 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 83 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 84 for nic in self._nics_json_obj: 85 for addr in nic["addr_info"]: 86 if ip in addr["local"]: 87 return nic["ifname"] 88 89 def set_local_nic_info_helper(self): 90 pass 91 92 def set_local_nic_info(self, pci_info): 93 def extract_network_elements(json_obj): 94 nic_list = [] 95 if isinstance(json_obj, list): 96 for x in json_obj: 97 nic_list.extend(extract_network_elements(x)) 98 elif isinstance(json_obj, dict): 99 if "children" in json_obj: 100 nic_list.extend(extract_network_elements(json_obj["children"])) 101 if "class" in json_obj.keys() and "network" in json_obj["class"]: 102 nic_list.append(json_obj) 103 return nic_list 104 105 self.local_nic_info = extract_network_elements(pci_info) 106 107 def get_nic_numa_node(self, nic_name): 108 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 109 110 def get_numa_cpu_map(self): 111 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 112 numa_cpu_json_map = {} 113 114 for cpu in numa_cpu_json_obj["cpus"]: 115 cpu_num = int(cpu["cpu"]) 116 numa_node = int(cpu["node"]) 117 numa_cpu_json_map.setdefault(numa_node, []) 118 numa_cpu_json_map[numa_node].append(cpu_num) 119 120 return numa_cpu_json_map 121 122 # pylint: disable=R0201 123 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 124 return "" 125 126 def configure_system(self): 127 self.load_drivers() 128 self.configure_services() 129 self.configure_sysctl() 130 self.configure_arfs() 131 self.configure_tuned() 132 self.configure_cpu_governor() 133 self.configure_irq_affinity(**self.irq_settings) 134 135 RDMA_PROTOCOL_IWARP = 0 136 RDMA_PROTOCOL_ROCE = 1 137 RDMA_PROTOCOL_UNKNOWN = -1 138 139 def check_rdma_protocol(self): 140 try: 141 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 142 roce_ena = roce_ena.strip() 143 if roce_ena == "0": 144 return self.RDMA_PROTOCOL_IWARP 145 else: 146 return self.RDMA_PROTOCOL_ROCE 147 except CalledProcessError as e: 148 self.log.error("ERROR: failed to check RDMA protocol!") 149 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 150 return self.RDMA_PROTOCOL_UNKNOWN 151 152 def load_drivers(self): 153 self.log.info("Loading drivers") 154 self.exec_cmd(["sudo", "modprobe", "-a", 155 "nvme-%s" % self.transport, 156 "nvmet-%s" % self.transport]) 157 current_mode = self.check_rdma_protocol() 158 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 159 self.log.error("ERROR: failed to check RDMA protocol mode") 160 return 161 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 162 self.reload_driver("irdma", "roce_ena=1") 163 self.log.info("Loaded irdma driver with RoCE enabled") 164 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 165 self.log.info("Leaving irdma driver with RoCE enabled") 166 else: 167 self.reload_driver("irdma", "roce_ena=0") 168 self.log.info("Loaded irdma driver with iWARP enabled") 169 170 def configure_arfs(self): 171 rps_flow_cnt = 512 172 if not self.enable_arfs: 173 rps_flow_cnt = 0 174 175 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 176 for nic_name in nic_names: 177 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 178 self.log.info(f"Setting rps_flow_cnt for {nic_name}") 179 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 180 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 181 182 for qf in queue_files: 183 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 184 185 def configure_adq(self): 186 self.adq_load_modules() 187 self.adq_configure_nic() 188 189 def adq_load_modules(self): 190 self.log.info("Modprobing ADQ-related Linux modules...") 191 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 192 for module in adq_module_deps: 193 try: 194 self.exec_cmd(["sudo", "modprobe", module]) 195 self.log.info("%s loaded!" % module) 196 except CalledProcessError as e: 197 self.log.error("ERROR: failed to load module %s" % module) 198 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 199 200 def adq_configure_tc(self): 201 self.log.info("Configuring ADQ Traffic classes and filters...") 202 203 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 204 num_queues_tc1 = self.num_cores 205 port_param = "dst_port" if isinstance(self, Target) else "src_port" 206 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 207 208 for nic_ip in self.nic_ips: 209 nic_name = self.get_nic_name_by_ip(nic_ip) 210 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 211 212 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 213 "root", "mqprio", "num_tc", "2", "map", "0", "1", 214 "queues", "%s@0" % num_queues_tc0, 215 "%s@%s" % (num_queues_tc1, num_queues_tc0), 216 "hw", "1", "mode", "channel"] 217 self.log.info(" ".join(tc_qdisc_map_cmd)) 218 self.exec_cmd(tc_qdisc_map_cmd) 219 220 time.sleep(5) 221 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 222 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 223 self.exec_cmd(tc_qdisc_ingress_cmd) 224 225 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 226 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 227 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 228 229 for port in nic_ports: 230 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 231 "protocol", "ip", "ingress", "prio", "1", "flower", 232 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 233 "skip_sw", "hw_tc", "1"] 234 self.log.info(" ".join(tc_filter_cmd)) 235 self.exec_cmd(tc_filter_cmd) 236 237 # show tc configuration 238 self.log.info("Show tc configuration for %s NIC..." % nic_name) 239 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 240 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 241 self.log.info("%s" % tc_disk_out) 242 self.log.info("%s" % tc_filter_out) 243 244 # Ethtool coalesce settings must be applied after configuring traffic classes 245 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 246 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 247 248 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 249 xps_cmd = ["sudo", xps_script_path, nic_name] 250 self.log.info(xps_cmd) 251 self.exec_cmd(xps_cmd) 252 253 def reload_driver(self, driver, *modprobe_args): 254 255 try: 256 self.exec_cmd(["sudo", "rmmod", driver]) 257 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 258 except CalledProcessError as e: 259 self.log.error("ERROR: failed to reload %s module!" % driver) 260 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 261 262 def adq_configure_nic(self): 263 self.log.info("Configuring NIC port settings for ADQ testing...") 264 265 # Reload the driver first, to make sure any previous settings are re-set. 266 self.reload_driver("ice") 267 268 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 269 for nic in nic_names: 270 self.log.info(nic) 271 try: 272 self.exec_cmd(["sudo", "ethtool", "-K", nic, 273 "hw-tc-offload", "on"]) # Enable hardware TC offload 274 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 275 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 276 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 277 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 278 # but can be used with 4k block sizes as well 279 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 280 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 281 except subprocess.CalledProcessError as e: 282 self.log.error("ERROR: failed to configure NIC port using ethtool!") 283 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 284 self.log.info("Please update your NIC driver and firmware versions and try again.") 285 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 286 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 287 288 def configure_services(self): 289 self.log.info("Configuring active services...") 290 svc_config = configparser.ConfigParser(strict=False) 291 292 # Below list is valid only for RHEL / Fedora systems and might not 293 # contain valid names for other distributions. 294 svc_target_state = { 295 "firewalld": "inactive", 296 "irqbalance": "inactive", 297 "lldpad.service": "inactive", 298 "lldpad.socket": "inactive" 299 } 300 301 for service in svc_target_state: 302 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 303 out = "\n".join(["[%s]" % service, out]) 304 svc_config.read_string(out) 305 306 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 307 continue 308 309 service_state = svc_config[service]["ActiveState"] 310 self.log.info("Current state of %s service is %s" % (service, service_state)) 311 self.svc_restore_dict.update({service: service_state}) 312 if service_state != "inactive": 313 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 314 self.exec_cmd(["sudo", "systemctl", "stop", service]) 315 316 def configure_sysctl(self): 317 self.log.info("Tuning sysctl settings...") 318 319 busy_read = 0 320 busy_poll = 0 321 if self.enable_adq and self.mode == "spdk": 322 busy_read = 1 323 busy_poll = 1 324 325 sysctl_opts = { 326 "net.core.busy_poll": busy_poll, 327 "net.core.busy_read": busy_read, 328 "net.core.somaxconn": 4096, 329 "net.core.netdev_max_backlog": 8192, 330 "net.ipv4.tcp_max_syn_backlog": 16384, 331 "net.core.rmem_max": 268435456, 332 "net.core.wmem_max": 268435456, 333 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 334 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 335 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 336 "net.ipv4.route.flush": 1, 337 "vm.overcommit_memory": 1, 338 "net.core.rps_sock_flow_entries": 0 339 } 340 341 if self.enable_adq: 342 sysctl_opts.update(self.adq_set_busy_read(1)) 343 344 if self.enable_arfs: 345 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 346 347 for opt, value in sysctl_opts.items(): 348 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 349 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 350 351 def configure_tuned(self): 352 if not self.tuned_profile: 353 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 354 return 355 356 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 357 service = "tuned" 358 tuned_config = configparser.ConfigParser(strict=False) 359 360 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 361 out = "\n".join(["[%s]" % service, out]) 362 tuned_config.read_string(out) 363 tuned_state = tuned_config[service]["ActiveState"] 364 self.svc_restore_dict.update({service: tuned_state}) 365 366 if tuned_state != "inactive": 367 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 368 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 369 370 self.tuned_restore_dict = { 371 "profile": profile, 372 "mode": profile_mode 373 } 374 375 self.exec_cmd(["sudo", "systemctl", "start", service]) 376 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 377 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 378 379 def configure_cpu_governor(self): 380 self.log.info("Setting CPU governor to performance...") 381 382 # This assumes that there is the same CPU scaling governor on each CPU 383 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 384 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 385 386 def get_core_list_from_mask(self, core_mask): 387 # Generate list of individual cores from hex core mask 388 # (e.g. '0xffff') or list containing: 389 # - individual cores (e.g. '1, 2, 3') 390 # - core ranges (e.g. '0-3') 391 # - mix of both (e.g. '0, 1-4, 9, 11-13') 392 393 core_list = [] 394 if "0x" in core_mask: 395 core_mask_int = int(core_mask, 16) 396 for i in range(core_mask_int.bit_length()): 397 if (1 << i) & core_mask_int: 398 core_list.append(i) 399 return core_list 400 else: 401 # Core list can be provided in .json config with square brackets 402 # remove them first 403 core_mask = core_mask.replace("[", "") 404 core_mask = core_mask.replace("]", "") 405 406 for i in core_mask.split(","): 407 if "-" in i: 408 start, end = i.split("-") 409 core_range = range(int(start), int(end) + 1) 410 core_list.extend(core_range) 411 else: 412 core_list.append(int(i)) 413 return core_list 414 415 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 416 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 417 418 if mode not in ["default", "bynode", "cpulist"]: 419 raise ValueError("%s irq affinity setting not supported" % mode) 420 421 if mode == "cpulist" and not cpulist: 422 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 423 424 affinity_script = "set_irq_affinity.sh" 425 if "default" not in mode: 426 affinity_script = "set_irq_affinity_cpulist.sh" 427 system_cpu_map = self.get_numa_cpu_map() 428 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 429 430 def cpu_list_to_string(cpulist): 431 return ",".join(map(lambda x: str(x), cpulist)) 432 433 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 434 for nic_name in nic_names: 435 irq_cmd = ["sudo", irq_script_path] 436 437 # Use only CPU cores matching NIC NUMA node. 438 # Remove any CPU cores if they're on exclusion list. 439 if mode == "bynode": 440 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 441 if cpulist and exclude_cpulist: 442 disallowed_cpus = self.get_core_list_from_mask(cpulist) 443 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 444 if not irq_cpus: 445 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 446 irq_cmd.append(cpu_list_to_string(irq_cpus)) 447 448 if mode == "cpulist": 449 irq_cpus = self.get_core_list_from_mask(cpulist) 450 if exclude_cpulist: 451 # Flatten system CPU list, we don't need NUMA awareness here 452 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 453 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 454 if not irq_cpus: 455 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 456 irq_cmd.append(cpu_list_to_string(irq_cpus)) 457 458 irq_cmd.append(nic_name) 459 self.log.info(irq_cmd) 460 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 461 462 def restore_services(self): 463 self.log.info("Restoring services...") 464 for service, state in self.svc_restore_dict.items(): 465 cmd = "stop" if state == "inactive" else "start" 466 self.exec_cmd(["sudo", "systemctl", cmd, service]) 467 468 def restore_sysctl(self): 469 self.log.info("Restoring sysctl settings...") 470 for opt, value in self.sysctl_restore_dict.items(): 471 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 472 473 def restore_tuned(self): 474 self.log.info("Restoring tuned-adm settings...") 475 476 if not self.tuned_restore_dict: 477 return 478 479 if self.tuned_restore_dict["mode"] == "auto": 480 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 481 self.log.info("Reverted tuned-adm to auto_profile.") 482 else: 483 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 484 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 485 486 def restore_governor(self): 487 self.log.info("Restoring CPU governor setting...") 488 if self.governor_restore: 489 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 490 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 491 492 def restore_settings(self): 493 self.restore_governor() 494 self.restore_tuned() 495 self.restore_services() 496 self.restore_sysctl() 497 if self.enable_adq: 498 self.reload_driver("ice") 499 500 501class Target(Server): 502 def __init__(self, name, general_config, target_config): 503 super().__init__(name, general_config, target_config) 504 505 # Defaults 506 self.enable_zcopy = False 507 self.scheduler_name = "static" 508 self.null_block = 0 509 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 510 self.subsystem_info_list = [] 511 self.initiator_info = [] 512 self.nvme_allowlist = [] 513 self.nvme_blocklist = [] 514 515 # Target-side measurement options 516 self.enable_pm = True 517 self.enable_sar = True 518 self.enable_pcm = True 519 self.enable_bw = True 520 self.enable_dpdk_memory = True 521 522 if "null_block_devices" in target_config: 523 self.null_block = target_config["null_block_devices"] 524 if "scheduler_settings" in target_config: 525 self.scheduler_name = target_config["scheduler_settings"] 526 if "zcopy_settings" in target_config: 527 self.enable_zcopy = target_config["zcopy_settings"] 528 if "results_dir" in target_config: 529 self.results_dir = target_config["results_dir"] 530 if "blocklist" in target_config: 531 self.nvme_blocklist = target_config["blocklist"] 532 if "allowlist" in target_config: 533 self.nvme_allowlist = target_config["allowlist"] 534 # Blocklist takes precedence, remove common elements from allowlist 535 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 536 if "enable_pm" in target_config: 537 self.enable_pm = target_config["enable_pm"] 538 if "enable_sar" in target_config: 539 self.enable_sar = target_config["enable_sar"] 540 if "enable_pcm" in target_config: 541 self.enable_pcm = target_config["enable_pcm"] 542 if "enable_bandwidth" in target_config: 543 self.enable_bw = target_config["enable_bandwidth"] 544 if "enable_dpdk_memory" in target_config: 545 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 546 547 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 548 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 549 550 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 551 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 552 self.set_local_nic_info(self.set_local_nic_info_helper()) 553 554 if self.skip_spdk_install is False: 555 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 556 557 self.configure_system() 558 if self.enable_adq: 559 self.configure_adq() 560 self.configure_irq_affinity() 561 self.sys_config() 562 563 def set_local_nic_info_helper(self): 564 return json.loads(self.exec_cmd(["lshw", "-json"])) 565 566 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 567 stderr_opt = None 568 if stderr_redirect: 569 stderr_opt = subprocess.STDOUT 570 if change_dir: 571 old_cwd = os.getcwd() 572 os.chdir(change_dir) 573 self.log.info("Changing directory to %s" % change_dir) 574 575 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 576 577 if change_dir: 578 os.chdir(old_cwd) 579 self.log.info("Changing directory to %s" % old_cwd) 580 return out 581 582 def zip_spdk_sources(self, spdk_dir, dest_file): 583 self.log.info("Zipping SPDK source directory") 584 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 585 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 586 for file in files: 587 fh.write(os.path.relpath(os.path.join(root, file))) 588 fh.close() 589 self.log.info("Done zipping") 590 591 @staticmethod 592 def _chunks(input_list, chunks_no): 593 div, rem = divmod(len(input_list), chunks_no) 594 for i in range(chunks_no): 595 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 596 yield input_list[si:si + (div + 1 if i < rem else div)] 597 598 def spread_bdevs(self, req_disks): 599 # Spread available block devices indexes: 600 # - evenly across available initiator systems 601 # - evenly across available NIC interfaces for 602 # each initiator 603 # Not NUMA aware. 604 ip_bdev_map = [] 605 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 606 607 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 608 self.initiator_info[i]["bdev_range"] = init_chunk 609 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 610 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 611 for c in nic_chunk: 612 ip_bdev_map.append((ip, c)) 613 return ip_bdev_map 614 615 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 616 cpu_number = os.cpu_count() 617 sar_idle_sum = 0 618 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 619 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 620 621 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 622 time.sleep(ramp_time) 623 self.log.info("Starting SAR measurements") 624 625 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 626 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 627 for line in out.split("\n"): 628 if "Average" in line: 629 if "CPU" in line: 630 self.log.info("Summary CPU utilization from SAR:") 631 self.log.info(line) 632 elif "all" in line: 633 self.log.info(line) 634 else: 635 sar_idle_sum += float(line.split()[7]) 636 fh.write(out) 637 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 638 639 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 640 f.write("%0.2f" % sar_cpu_usage) 641 642 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 643 time.sleep(ramp_time) 644 self.log.info("Starting power measurements") 645 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 646 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 647 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 648 649 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 650 time.sleep(ramp_time) 651 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 652 pcm_memory = subprocess.Popen(cmd) 653 time.sleep(run_time) 654 pcm_memory.terminate() 655 656 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 657 time.sleep(ramp_time) 658 cmd = ["pcm", "1", "-i=%s" % run_time, 659 "-csv=%s/%s" % (results_dir, pcm_file_name)] 660 subprocess.run(cmd) 661 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 662 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 663 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 664 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 665 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 666 667 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 668 time.sleep(ramp_time) 669 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 670 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 671 fh.write(out) 672 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 673 # Improve this so that additional file containing measurements average is generated too. 674 675 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 676 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 677 time.sleep(ramp_time) 678 self.log.info("INFO: starting network bandwidth measure") 679 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 680 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 681 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 682 # Improve this so that additional file containing measurements average is generated too. 683 # TODO: Monitor only these interfaces which are currently used to run the workload. 684 685 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 686 self.log.info("INFO: waiting to generate DPDK memory usage") 687 time.sleep(ramp_time) 688 self.log.info("INFO: generating DPDK memory usage") 689 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 690 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 691 692 def sys_config(self): 693 self.log.info("====Kernel release:====") 694 self.log.info(os.uname().release) 695 self.log.info("====Kernel command line:====") 696 with open('/proc/cmdline') as f: 697 cmdline = f.readlines() 698 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 699 self.log.info("====sysctl conf:====") 700 with open('/etc/sysctl.conf') as f: 701 sysctl = f.readlines() 702 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 703 self.log.info("====Cpu power info:====") 704 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 705 self.log.info("====zcopy settings:====") 706 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 707 self.log.info("====Scheduler settings:====") 708 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 709 710 711class Initiator(Server): 712 def __init__(self, name, general_config, initiator_config): 713 super().__init__(name, general_config, initiator_config) 714 715 # Required fields 716 self.ip = initiator_config["ip"] 717 self.target_nic_ips = initiator_config["target_nic_ips"] 718 719 # Defaults 720 self.cpus_allowed = None 721 self.cpus_allowed_policy = "shared" 722 self.spdk_dir = "/tmp/spdk" 723 self.fio_bin = "/usr/src/fio/fio" 724 self.nvmecli_bin = "nvme" 725 self.cpu_frequency = None 726 self.subsystem_info_list = [] 727 self.allow_cpu_sharing = True 728 729 if "spdk_dir" in initiator_config: 730 self.spdk_dir = initiator_config["spdk_dir"] 731 if "fio_bin" in initiator_config: 732 self.fio_bin = initiator_config["fio_bin"] 733 if "nvmecli_bin" in initiator_config: 734 self.nvmecli_bin = initiator_config["nvmecli_bin"] 735 if "cpus_allowed" in initiator_config: 736 self.cpus_allowed = initiator_config["cpus_allowed"] 737 if "cpus_allowed_policy" in initiator_config: 738 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 739 if "cpu_frequency" in initiator_config: 740 self.cpu_frequency = initiator_config["cpu_frequency"] 741 if "allow_cpu_sharing" in initiator_config: 742 self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"] 743 if os.getenv('SPDK_WORKSPACE'): 744 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 745 746 self.ssh_connection = paramiko.SSHClient() 747 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 748 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 749 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 750 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 751 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 752 753 if self.skip_spdk_install is False: 754 self.copy_spdk("/tmp/spdk.zip") 755 756 self.set_local_nic_info(self.set_local_nic_info_helper()) 757 self.set_cpu_frequency() 758 self.configure_system() 759 if self.enable_adq: 760 self.configure_adq() 761 self.sys_config() 762 763 def set_local_nic_info_helper(self): 764 return json.loads(self.exec_cmd(["lshw", "-json"])) 765 766 def stop(self): 767 self.restore_settings() 768 self.ssh_connection.close() 769 770 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 771 if change_dir: 772 cmd = ["cd", change_dir, ";", *cmd] 773 774 # In case one of the command elements contains whitespace and is not 775 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 776 # when sending to remote system. 777 for i, c in enumerate(cmd): 778 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 779 cmd[i] = '"%s"' % c 780 cmd = " ".join(cmd) 781 782 # Redirect stderr to stdout thanks using get_pty option if needed 783 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 784 out = stdout.read().decode(encoding="utf-8") 785 786 # Check the return code 787 rc = stdout.channel.recv_exit_status() 788 if rc: 789 raise CalledProcessError(int(rc), cmd, out) 790 791 return out 792 793 def put_file(self, local, remote_dest): 794 ftp = self.ssh_connection.open_sftp() 795 ftp.put(local, remote_dest) 796 ftp.close() 797 798 def get_file(self, remote, local_dest): 799 ftp = self.ssh_connection.open_sftp() 800 ftp.get(remote, local_dest) 801 ftp.close() 802 803 def copy_spdk(self, local_spdk_zip): 804 self.log.info("Copying SPDK sources to initiator %s" % self.name) 805 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 806 self.log.info("Copied sources zip from target") 807 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 808 self.log.info("Sources unpacked") 809 810 def copy_result_files(self, dest_dir): 811 self.log.info("Copying results") 812 813 if not os.path.exists(dest_dir): 814 os.mkdir(dest_dir) 815 816 # Get list of result files from initiator and copy them back to target 817 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 818 819 for file in file_list: 820 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 821 os.path.join(dest_dir, file)) 822 self.log.info("Done copying results") 823 824 def match_subsystems(self, target_subsytems): 825 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 826 subsystems.sort(key=lambda x: x[1]) 827 self.log.info("Found matching subsystems on target side:") 828 for s in subsystems: 829 self.log.info(s) 830 self.subsystem_info_list = subsystems 831 832 def gen_fio_filename_conf(self, *args, **kwargs): 833 # Logic implemented in SPDKInitiator and KernelInitiator classes 834 pass 835 836 def get_route_nic_numa(self, remote_nvme_ip): 837 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 838 local_nvme_nic = local_nvme_nic[0]["dev"] 839 return self.get_nic_numa_node(local_nvme_nic) 840 841 @staticmethod 842 def gen_fio_offset_section(offset_inc, num_jobs): 843 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 844 return "\n".join(["size=%s%%" % offset_inc, 845 "offset=0%", 846 "offset_increment=%s%%" % offset_inc]) 847 848 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 849 numa_stats = {} 850 allowed_cpus = [] 851 for nvme in fio_filenames_list: 852 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 853 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 854 855 # Use the most common NUMA node for this chunk to allocate memory and CPUs 856 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 857 858 # Check if we have enough free CPUs to pop from the list before assigning them 859 if len(self.available_cpus[section_local_numa]) < num_jobs: 860 if self.allow_cpu_sharing: 861 self.log.info("Regenerating available CPU list %s" % section_local_numa) 862 # Remove still available CPUs from the regenerated list. We don't want to 863 # regenerate it with duplicates. 864 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 865 self.available_cpus[section_local_numa].extend(cpus_regen) 866 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 867 else: 868 self.log.error("No more free CPU cores to use from allowed_cpus list!") 869 raise IndexError 870 871 for _ in range(num_jobs): 872 try: 873 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 874 except IndexError: 875 self.log.error("No more free CPU cores to use from allowed_cpus list!") 876 raise 877 878 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 879 "numa_mem_policy=prefer:%s" % section_local_numa]) 880 881 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 882 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 883 offset=False, offset_inc=0): 884 fio_conf_template = """ 885[global] 886ioengine={ioengine} 887{spdk_conf} 888thread=1 889group_reporting=1 890direct=1 891percentile_list=50:90:99:99.5:99.9:99.99:99.999 892 893norandommap=1 894rw={rw} 895rwmixread={rwmixread} 896bs={block_size} 897time_based=1 898ramp_time={ramp_time} 899runtime={run_time} 900rate_iops={rate_iops} 901""" 902 903 if self.cpus_allowed is not None: 904 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 905 cpus_num = 0 906 cpus = self.cpus_allowed.split(",") 907 for cpu in cpus: 908 if "-" in cpu: 909 a, b = cpu.split("-") 910 a = int(a) 911 b = int(b) 912 cpus_num += len(range(a, b)) 913 else: 914 cpus_num += 1 915 self.num_cores = cpus_num 916 threads = range(0, self.num_cores) 917 elif hasattr(self, 'num_cores'): 918 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 919 threads = range(0, int(self.num_cores)) 920 else: 921 self.num_cores = len(self.subsystem_info_list) 922 threads = range(0, len(self.subsystem_info_list)) 923 924 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 925 offset, offset_inc) 926 927 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 928 rw=rw, rwmixread=rwmixread, block_size=block_size, 929 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 930 931 # TODO: hipri disabled for now, as it causes fio errors: 932 # io_u error on file /dev/nvme2n1: Operation not supported 933 # See comment in KernelInitiator class, init_connect() function 934 if "io_uring" in self.ioengine: 935 fio_config = fio_config + """ 936fixedbufs=1 937registerfiles=1 938#hipri=1 939""" 940 if num_jobs: 941 fio_config = fio_config + "numjobs=%s \n" % num_jobs 942 if self.cpus_allowed is not None: 943 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 944 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 945 fio_config = fio_config + filename_section 946 947 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 948 if hasattr(self, "num_cores"): 949 fio_config_filename += "_%sCPU" % self.num_cores 950 fio_config_filename += ".fio" 951 952 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 953 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 954 self.log.info("Created FIO Config:") 955 self.log.info(fio_config) 956 957 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 958 959 def set_cpu_frequency(self): 960 if self.cpu_frequency is not None: 961 try: 962 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 963 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 964 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 965 except Exception: 966 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 967 sys.exit() 968 else: 969 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 970 971 def run_fio(self, fio_config_file, run_num=1): 972 job_name, _ = os.path.splitext(fio_config_file) 973 self.log.info("Starting FIO run for job: %s" % job_name) 974 self.log.info("Using FIO: %s" % self.fio_bin) 975 976 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 977 try: 978 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 979 "--output=%s" % output_filename, "--eta=never"], True) 980 self.log.info(output) 981 self.log.info("FIO run finished. Results in: %s" % output_filename) 982 except subprocess.CalledProcessError as e: 983 self.log.error("ERROR: Fio process failed!") 984 self.log.error(e.stdout) 985 986 def sys_config(self): 987 self.log.info("====Kernel release:====") 988 self.log.info(self.exec_cmd(["uname", "-r"])) 989 self.log.info("====Kernel command line:====") 990 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 991 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 992 self.log.info("====sysctl conf:====") 993 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 994 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 995 self.log.info("====Cpu power info:====") 996 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 997 998 999class KernelTarget(Target): 1000 def __init__(self, name, general_config, target_config): 1001 super().__init__(name, general_config, target_config) 1002 # Defaults 1003 self.nvmet_bin = "nvmetcli" 1004 1005 if "nvmet_bin" in target_config: 1006 self.nvmet_bin = target_config["nvmet_bin"] 1007 1008 def load_drivers(self): 1009 self.log.info("Loading drivers") 1010 super().load_drivers() 1011 if self.null_block: 1012 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1013 1014 def configure_adq(self): 1015 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1016 1017 def adq_configure_tc(self): 1018 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1019 1020 def adq_set_busy_read(self, busy_read_val): 1021 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1022 return {"net.core.busy_read": 0} 1023 1024 def stop(self): 1025 self.nvmet_command(self.nvmet_bin, "clear") 1026 self.restore_settings() 1027 1028 def get_nvme_device_bdf(self, nvme_dev_path): 1029 nvme_name = os.path.basename(nvme_dev_path) 1030 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1031 1032 def get_nvme_devices(self): 1033 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1034 nvme_list = [] 1035 for dev in dev_list: 1036 if "nvme" not in dev: 1037 continue 1038 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1039 continue 1040 if len(self.nvme_allowlist) == 0: 1041 nvme_list.append(dev) 1042 continue 1043 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1044 nvme_list.append(dev) 1045 return nvme_list 1046 1047 def nvmet_command(self, nvmet_bin, command): 1048 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1049 1050 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1051 1052 nvmet_cfg = { 1053 "ports": [], 1054 "hosts": [], 1055 "subsystems": [], 1056 } 1057 1058 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1059 port = str(4420 + bdev_num) 1060 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1061 serial = "SPDK00%s" % bdev_num 1062 bdev_name = nvme_list[bdev_num] 1063 1064 nvmet_cfg["subsystems"].append({ 1065 "allowed_hosts": [], 1066 "attr": { 1067 "allow_any_host": "1", 1068 "serial": serial, 1069 "version": "1.3" 1070 }, 1071 "namespaces": [ 1072 { 1073 "device": { 1074 "path": bdev_name, 1075 "uuid": "%s" % uuid.uuid4() 1076 }, 1077 "enable": 1, 1078 "nsid": port 1079 } 1080 ], 1081 "nqn": nqn 1082 }) 1083 1084 nvmet_cfg["ports"].append({ 1085 "addr": { 1086 "adrfam": "ipv4", 1087 "traddr": ip, 1088 "trsvcid": port, 1089 "trtype": self.transport 1090 }, 1091 "portid": bdev_num, 1092 "referrals": [], 1093 "subsystems": [nqn] 1094 }) 1095 1096 self.subsystem_info_list.append((port, nqn, ip)) 1097 self.subsys_no = len(self.subsystem_info_list) 1098 1099 with open("kernel.conf", "w") as fh: 1100 fh.write(json.dumps(nvmet_cfg, indent=2)) 1101 1102 def tgt_start(self): 1103 self.log.info("Configuring kernel NVMeOF Target") 1104 1105 if self.null_block: 1106 self.log.info("Configuring with null block device.") 1107 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1108 else: 1109 self.log.info("Configuring with NVMe drives.") 1110 nvme_list = self.get_nvme_devices() 1111 1112 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1113 self.subsys_no = len(nvme_list) 1114 1115 self.nvmet_command(self.nvmet_bin, "clear") 1116 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1117 1118 if self.enable_adq: 1119 self.adq_configure_tc() 1120 1121 self.log.info("Done configuring kernel NVMeOF Target") 1122 1123 1124class SPDKTarget(Target): 1125 def __init__(self, name, general_config, target_config): 1126 # Required fields 1127 self.core_mask = target_config["core_mask"] 1128 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1129 1130 super().__init__(name, general_config, target_config) 1131 1132 # Defaults 1133 self.dif_insert_strip = False 1134 self.null_block_dif_type = 0 1135 self.num_shared_buffers = 4096 1136 self.max_queue_depth = 128 1137 self.bpf_proc = None 1138 self.bpf_scripts = [] 1139 self.enable_dsa = False 1140 self.scheduler_core_limit = None 1141 self.iobuf_small_pool_count = 32767 1142 self.iobuf_large_pool_count = 16383 1143 self.num_cqe = 4096 1144 1145 if "num_shared_buffers" in target_config: 1146 self.num_shared_buffers = target_config["num_shared_buffers"] 1147 if "max_queue_depth" in target_config: 1148 self.max_queue_depth = target_config["max_queue_depth"] 1149 if "null_block_dif_type" in target_config: 1150 self.null_block_dif_type = target_config["null_block_dif_type"] 1151 if "dif_insert_strip" in target_config: 1152 self.dif_insert_strip = target_config["dif_insert_strip"] 1153 if "bpf_scripts" in target_config: 1154 self.bpf_scripts = target_config["bpf_scripts"] 1155 if "dsa_settings" in target_config: 1156 self.enable_dsa = target_config["dsa_settings"] 1157 if "scheduler_core_limit" in target_config: 1158 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1159 if "iobuf_small_pool_count" in target_config: 1160 self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"] 1161 if "iobuf_large_pool_count" in target_config: 1162 self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"] 1163 if "num_cqe" in target_config: 1164 self.num_cqe = target_config["num_cqe"] 1165 1166 self.log.info("====DSA settings:====") 1167 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1168 1169 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1170 if mode not in ["default", "bynode", "cpulist", 1171 "shared", "split", "split-bynode"]: 1172 self.log.error("%s irq affinity setting not supported" % mode) 1173 raise Exception 1174 1175 # Create core list from SPDK's mask and change it to string. 1176 # This is the type configure_irq_affinity expects for cpulist parameter. 1177 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1178 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1179 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1180 1181 if mode == "shared": 1182 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1183 elif mode == "split": 1184 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1185 elif mode == "split-bynode": 1186 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1187 else: 1188 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1189 1190 def adq_set_busy_read(self, busy_read_val): 1191 return {"net.core.busy_read": busy_read_val} 1192 1193 def get_nvme_devices_count(self): 1194 return len(self.get_nvme_devices()) 1195 1196 def get_nvme_devices(self): 1197 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1198 bdev_bdfs = [] 1199 for bdev in bdev_subsys_json_obj["config"]: 1200 bdev_traddr = bdev["params"]["traddr"] 1201 if bdev_traddr in self.nvme_blocklist: 1202 continue 1203 if len(self.nvme_allowlist) == 0: 1204 bdev_bdfs.append(bdev_traddr) 1205 if bdev_traddr in self.nvme_allowlist: 1206 bdev_bdfs.append(bdev_traddr) 1207 return bdev_bdfs 1208 1209 def spdk_tgt_configure(self): 1210 self.log.info("Configuring SPDK NVMeOF target via RPC") 1211 1212 # Create transport layer 1213 nvmf_transport_params = { 1214 "client": self.client, 1215 "trtype": self.transport, 1216 "num_shared_buffers": self.num_shared_buffers, 1217 "max_queue_depth": self.max_queue_depth, 1218 "dif_insert_or_strip": self.dif_insert_strip, 1219 "sock_priority": self.adq_priority, 1220 "num_cqe": self.num_cqe 1221 } 1222 1223 if self.enable_adq: 1224 nvmf_transport_params["acceptor_poll_rate"] = 10000 1225 1226 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1227 self.log.info("SPDK NVMeOF transport layer:") 1228 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1229 1230 if self.null_block: 1231 self.spdk_tgt_add_nullblock(self.null_block) 1232 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1233 else: 1234 self.spdk_tgt_add_nvme_conf() 1235 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1236 1237 if self.enable_adq: 1238 self.adq_configure_tc() 1239 1240 self.log.info("Done configuring SPDK NVMeOF Target") 1241 1242 def spdk_tgt_add_nullblock(self, null_block_count): 1243 md_size = 0 1244 block_size = 4096 1245 if self.null_block_dif_type != 0: 1246 md_size = 128 1247 1248 self.log.info("Adding null block bdevices to config via RPC") 1249 for i in range(null_block_count): 1250 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1251 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1252 dif_type=self.null_block_dif_type, md_size=md_size) 1253 self.log.info("SPDK Bdevs configuration:") 1254 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1255 1256 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1257 self.log.info("Adding NVMe bdevs to config via RPC") 1258 1259 bdfs = self.get_nvme_devices() 1260 bdfs = [b.replace(":", ".") for b in bdfs] 1261 1262 if req_num_disks: 1263 if req_num_disks > len(bdfs): 1264 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1265 sys.exit(1) 1266 else: 1267 bdfs = bdfs[0:req_num_disks] 1268 1269 for i, bdf in enumerate(bdfs): 1270 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1271 1272 self.log.info("SPDK Bdevs configuration:") 1273 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1274 1275 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1276 self.log.info("Adding subsystems to config") 1277 if not req_num_disks: 1278 req_num_disks = self.get_nvme_devices_count() 1279 1280 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1281 port = str(4420 + bdev_num) 1282 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1283 serial = "SPDK00%s" % bdev_num 1284 bdev_name = "Nvme%sn1" % bdev_num 1285 1286 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1287 allow_any_host=True, max_namespaces=8) 1288 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1289 for nqn_name in [nqn, "discovery"]: 1290 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1291 nqn=nqn_name, 1292 trtype=self.transport, 1293 traddr=ip, 1294 trsvcid=port, 1295 adrfam="ipv4") 1296 self.subsystem_info_list.append((port, nqn, ip)) 1297 self.subsys_no = len(self.subsystem_info_list) 1298 1299 self.log.info("SPDK NVMeOF subsystem configuration:") 1300 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1301 1302 def bpf_start(self): 1303 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1304 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1305 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1306 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1307 1308 with open(self.pid, "r") as fh: 1309 nvmf_pid = str(fh.readline()) 1310 1311 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1312 self.log.info(cmd) 1313 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1314 1315 def tgt_start(self): 1316 if self.null_block: 1317 self.subsys_no = 1 1318 else: 1319 self.subsys_no = self.get_nvme_devices_count() 1320 self.log.info("Starting SPDK NVMeOF Target process") 1321 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1322 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1323 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1324 1325 with open(self.pid, "w") as fh: 1326 fh.write(str(proc.pid)) 1327 self.nvmf_proc = proc 1328 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1329 self.log.info("Waiting for spdk to initialize...") 1330 while True: 1331 if os.path.exists("/var/tmp/spdk.sock"): 1332 break 1333 time.sleep(1) 1334 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1335 1336 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1337 rpc.iobuf.iobuf_set_options(self.client, 1338 small_pool_count=self.iobuf_small_pool_count, 1339 large_pool_count=self.iobuf_large_pool_count, 1340 small_bufsize=None, 1341 large_bufsize=None) 1342 1343 if self.enable_zcopy: 1344 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1345 enable_zerocopy_send_server=True) 1346 self.log.info("Target socket options:") 1347 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1348 1349 if self.enable_adq: 1350 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1351 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1352 nvme_adminq_poll_period_us=100000, retry_count=4) 1353 1354 if self.enable_dsa: 1355 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1356 self.log.info("Target DSA accel module enabled") 1357 1358 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1359 rpc.framework_start_init(self.client) 1360 1361 if self.bpf_scripts: 1362 self.bpf_start() 1363 1364 self.spdk_tgt_configure() 1365 1366 def stop(self): 1367 if self.bpf_proc: 1368 self.log.info("Stopping BPF Trace script") 1369 self.bpf_proc.terminate() 1370 self.bpf_proc.wait() 1371 1372 if hasattr(self, "nvmf_proc"): 1373 try: 1374 self.nvmf_proc.terminate() 1375 self.nvmf_proc.wait(timeout=30) 1376 except Exception as e: 1377 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1378 self.log.info(e) 1379 self.nvmf_proc.kill() 1380 self.nvmf_proc.communicate() 1381 # Try to clean up RPC socket files if they were not removed 1382 # because of using 'kill' 1383 try: 1384 os.remove("/var/tmp/spdk.sock") 1385 os.remove("/var/tmp/spdk.sock.lock") 1386 except FileNotFoundError: 1387 pass 1388 self.restore_settings() 1389 1390 1391class KernelInitiator(Initiator): 1392 def __init__(self, name, general_config, initiator_config): 1393 super().__init__(name, general_config, initiator_config) 1394 1395 # Defaults 1396 self.extra_params = "" 1397 self.ioengine = "libaio" 1398 self.spdk_conf = "" 1399 1400 if "num_cores" in initiator_config: 1401 self.num_cores = initiator_config["num_cores"] 1402 1403 if "extra_params" in initiator_config: 1404 self.extra_params = initiator_config["extra_params"] 1405 1406 if "kernel_engine" in initiator_config: 1407 self.ioengine = initiator_config["kernel_engine"] 1408 if "io_uring" in self.ioengine: 1409 self.extra_params += ' --nr-poll-queues=8' 1410 1411 def configure_adq(self): 1412 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1413 1414 def adq_configure_tc(self): 1415 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1416 1417 def adq_set_busy_read(self, busy_read_val): 1418 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1419 return {"net.core.busy_read": 0} 1420 1421 def get_connected_nvme_list(self): 1422 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1423 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1424 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1425 return nvme_list 1426 1427 def init_connect(self): 1428 self.log.info("Below connection attempts may result in error messages, this is expected!") 1429 for subsystem in self.subsystem_info_list: 1430 self.log.info("Trying to connect %s %s %s" % subsystem) 1431 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1432 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1433 time.sleep(2) 1434 1435 if "io_uring" in self.ioengine: 1436 self.log.info("Setting block layer settings for io_uring.") 1437 1438 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1439 # apparently it's not possible for connected subsystems. 1440 # Results in "error: Invalid argument" 1441 block_sysfs_settings = { 1442 "iostats": "0", 1443 "rq_affinity": "0", 1444 "nomerges": "2" 1445 } 1446 1447 for disk in self.get_connected_nvme_list(): 1448 sysfs = os.path.join("/sys/block", disk, "queue") 1449 for k, v in block_sysfs_settings.items(): 1450 sysfs_opt_path = os.path.join(sysfs, k) 1451 try: 1452 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1453 except CalledProcessError as e: 1454 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1455 finally: 1456 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1457 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1458 1459 def init_disconnect(self): 1460 for subsystem in self.subsystem_info_list: 1461 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1462 time.sleep(1) 1463 1464 def get_nvme_subsystem_numa(self, dev_name): 1465 # Remove two last characters to get controller name instead of subsystem name 1466 nvme_ctrl = os.path.basename(dev_name)[:-2] 1467 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1468 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1469 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1470 1471 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1472 self.available_cpus = self.get_numa_cpu_map() 1473 if len(threads) >= len(subsystems): 1474 threads = range(0, len(subsystems)) 1475 1476 # Generate connected nvme devices names and sort them by used NIC numa node 1477 # to allow better grouping when splitting into fio sections. 1478 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1479 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1480 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1481 1482 filename_section = "" 1483 nvme_per_split = int(len(nvme_list) / len(threads)) 1484 remainder = len(nvme_list) % len(threads) 1485 iterator = iter(nvme_list) 1486 result = [] 1487 for i in range(len(threads)): 1488 result.append([]) 1489 for _ in range(nvme_per_split): 1490 result[i].append(next(iterator)) 1491 if remainder: 1492 result[i].append(next(iterator)) 1493 remainder -= 1 1494 for i, r in enumerate(result): 1495 header = "[filename%s]" % i 1496 disks = "\n".join(["filename=%s" % x for x in r]) 1497 job_section_qd = round((io_depth * len(r)) / num_jobs) 1498 if job_section_qd == 0: 1499 job_section_qd = 1 1500 iodepth = "iodepth=%s" % job_section_qd 1501 1502 offset_section = "" 1503 if offset: 1504 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1505 1506 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1507 1508 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1509 1510 return filename_section 1511 1512 1513class SPDKInitiator(Initiator): 1514 def __init__(self, name, general_config, initiator_config): 1515 super().__init__(name, general_config, initiator_config) 1516 1517 if self.skip_spdk_install is False: 1518 self.install_spdk() 1519 1520 # Optional fields 1521 self.enable_data_digest = False 1522 if "enable_data_digest" in initiator_config: 1523 self.enable_data_digest = initiator_config["enable_data_digest"] 1524 if "num_cores" in initiator_config: 1525 self.num_cores = initiator_config["num_cores"] 1526 1527 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1528 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1529 1530 def adq_set_busy_read(self, busy_read_val): 1531 return {"net.core.busy_read": busy_read_val} 1532 1533 def install_spdk(self): 1534 self.log.info("Using fio binary %s" % self.fio_bin) 1535 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1536 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1537 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1538 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1539 "--enable-lto", "--disable-unit-tests"]) 1540 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1541 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1542 1543 self.log.info("SPDK built") 1544 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1545 1546 def init_connect(self): 1547 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1548 # bdev plugin initiates connection just before starting IO traffic. 1549 # This is just to have a "init_connect" equivalent of the same function 1550 # from KernelInitiator class. 1551 # Just prepare bdev.conf JSON file for later use and consider it 1552 # "making a connection". 1553 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1554 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1555 1556 def init_disconnect(self): 1557 # SPDK Initiator does not need to explicity disconnect as this gets done 1558 # after fio bdev plugin finishes IO. 1559 pass 1560 1561 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1562 bdev_cfg_section = { 1563 "subsystems": [ 1564 { 1565 "subsystem": "bdev", 1566 "config": [] 1567 } 1568 ] 1569 } 1570 1571 for i, subsys in enumerate(remote_subsystem_list): 1572 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1573 nvme_ctrl = { 1574 "method": "bdev_nvme_attach_controller", 1575 "params": { 1576 "name": "Nvme{}".format(i), 1577 "trtype": self.transport, 1578 "traddr": sub_addr, 1579 "trsvcid": sub_port, 1580 "subnqn": sub_nqn, 1581 "adrfam": "IPv4" 1582 } 1583 } 1584 1585 if self.enable_adq: 1586 nvme_ctrl["params"].update({"priority": "1"}) 1587 1588 if self.enable_data_digest: 1589 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1590 1591 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1592 1593 return json.dumps(bdev_cfg_section, indent=2) 1594 1595 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1596 self.available_cpus = self.get_numa_cpu_map() 1597 filename_section = "" 1598 if len(threads) >= len(subsystems): 1599 threads = range(0, len(subsystems)) 1600 1601 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1602 # to allow better grouping when splitting into fio sections. 1603 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1604 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1605 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1606 1607 nvme_per_split = int(len(subsystems) / len(threads)) 1608 remainder = len(subsystems) % len(threads) 1609 iterator = iter(filenames) 1610 result = [] 1611 for i in range(len(threads)): 1612 result.append([]) 1613 for _ in range(nvme_per_split): 1614 result[i].append(next(iterator)) 1615 if remainder: 1616 result[i].append(next(iterator)) 1617 remainder -= 1 1618 for i, r in enumerate(result): 1619 header = "[filename%s]" % i 1620 disks = "\n".join(["filename=%s" % x for x in r]) 1621 job_section_qd = round((io_depth * len(r)) / num_jobs) 1622 if job_section_qd == 0: 1623 job_section_qd = 1 1624 iodepth = "iodepth=%s" % job_section_qd 1625 1626 offset_section = "" 1627 if offset: 1628 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1629 1630 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1631 1632 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1633 1634 return filename_section 1635 1636 def get_nvme_subsystem_numa(self, bdev_name): 1637 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1638 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1639 1640 # Remove two last characters to get controller name instead of subsystem name 1641 nvme_ctrl = bdev_name[:-2] 1642 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1643 return self.get_route_nic_numa(remote_nvme_ip) 1644 1645 1646if __name__ == "__main__": 1647 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1648 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1649 1650 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1651 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1652 help='Configuration file.') 1653 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1654 help='Results directory.') 1655 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1656 help='CSV results filename.') 1657 parser.add_argument('-f', '--force', default=False, action='store_true', 1658 dest='force', help="""Force script to continue and try to use all 1659 available NVMe devices during test. 1660 WARNING: Might result in data loss on used NVMe drives""") 1661 1662 args = parser.parse_args() 1663 1664 logging.basicConfig(level=logging.INFO, 1665 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1666 1667 logging.info("Using config file: %s" % args.config) 1668 with open(args.config, "r") as config: 1669 data = json.load(config) 1670 1671 initiators = [] 1672 fio_cases = [] 1673 1674 general_config = data["general"] 1675 target_config = data["target"] 1676 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1677 1678 if "null_block_devices" not in data["target"] and \ 1679 (args.force is False and 1680 "allowlist" not in data["target"] and 1681 "blocklist" not in data["target"]): 1682 # TODO: Also check if allowlist or blocklist are not empty. 1683 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1684 You can choose to use all available NVMe drives on your system, which may potentially 1685 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1686 exit(1) 1687 1688 for k, v in data.items(): 1689 if "target" in k: 1690 v.update({"results_dir": args.results}) 1691 if data[k]["mode"] == "spdk": 1692 target_obj = SPDKTarget(k, data["general"], v) 1693 elif data[k]["mode"] == "kernel": 1694 target_obj = KernelTarget(k, data["general"], v) 1695 elif "initiator" in k: 1696 if data[k]["mode"] == "spdk": 1697 init_obj = SPDKInitiator(k, data["general"], v) 1698 elif data[k]["mode"] == "kernel": 1699 init_obj = KernelInitiator(k, data["general"], v) 1700 initiators.append(init_obj) 1701 elif "fio" in k: 1702 fio_workloads = itertools.product(data[k]["bs"], 1703 data[k]["qd"], 1704 data[k]["rw"]) 1705 1706 fio_run_time = data[k]["run_time"] 1707 fio_ramp_time = data[k]["ramp_time"] 1708 fio_rw_mix_read = data[k]["rwmixread"] 1709 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1710 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1711 1712 fio_rate_iops = 0 1713 if "rate_iops" in data[k]: 1714 fio_rate_iops = data[k]["rate_iops"] 1715 1716 fio_offset = False 1717 if "offset" in data[k]: 1718 fio_offset = data[k]["offset"] 1719 fio_offset_inc = 0 1720 if "offset_inc" in data[k]: 1721 fio_offset_inc = data[k]["offset_inc"] 1722 else: 1723 continue 1724 1725 try: 1726 os.mkdir(args.results) 1727 except FileExistsError: 1728 pass 1729 1730 for i in initiators: 1731 target_obj.initiator_info.append( 1732 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1733 ) 1734 1735 # TODO: This try block is definietly too large. Need to break this up into separate 1736 # logical blocks to reduce size. 1737 try: 1738 target_obj.tgt_start() 1739 1740 for i in initiators: 1741 i.match_subsystems(target_obj.subsystem_info_list) 1742 if i.enable_adq: 1743 i.adq_configure_tc() 1744 1745 # Poor mans threading 1746 # Run FIO tests 1747 for block_size, io_depth, rw in fio_workloads: 1748 configs = [] 1749 for i in initiators: 1750 i.init_connect() 1751 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1752 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1753 fio_offset, fio_offset_inc) 1754 configs.append(cfg) 1755 1756 for run_no in range(1, fio_run_num+1): 1757 threads = [] 1758 power_daemon = None 1759 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1760 1761 for i, cfg in zip(initiators, configs): 1762 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1763 threads.append(t) 1764 if target_obj.enable_sar: 1765 sar_file_prefix = measurements_prefix + "_sar" 1766 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1767 threads.append(t) 1768 1769 if target_obj.enable_pcm: 1770 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1771 1772 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1773 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1774 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1775 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1776 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1777 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1778 1779 threads.append(pcm_cpu_t) 1780 threads.append(pcm_mem_t) 1781 threads.append(pcm_pow_t) 1782 1783 if target_obj.enable_bw: 1784 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1785 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1786 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1787 threads.append(t) 1788 1789 if target_obj.enable_dpdk_memory: 1790 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1791 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1792 threads.append(t) 1793 1794 if target_obj.enable_pm: 1795 power_daemon = threading.Thread(target=target_obj.measure_power, 1796 args=(args.results, measurements_prefix, script_full_dir, 1797 fio_ramp_time, fio_run_time)) 1798 threads.append(power_daemon) 1799 1800 for t in threads: 1801 t.start() 1802 for t in threads: 1803 t.join() 1804 1805 for i in initiators: 1806 i.init_disconnect() 1807 i.copy_result_files(args.results) 1808 try: 1809 parse_results(args.results, args.csv_filename) 1810 except Exception as err: 1811 logging.error("There was an error with parsing the results") 1812 logging.error(err) 1813 finally: 1814 for i in initiators: 1815 try: 1816 i.stop() 1817 except Exception as err: 1818 pass 1819 target_obj.stop() 1820