1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.skip_spdk_install = general_config.get('skip_spdk_install', False) 39 self.nic_ips = server_config["nic_ips"] 40 self.mode = server_config["mode"] 41 self.irdma_roce_enable = False 42 43 self.log = logging.getLogger(self.name) 44 45 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 46 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 47 self.irq_scripts_dir = server_config["irq_scripts_dir"] 48 49 self.local_nic_info = [] 50 self._nics_json_obj = {} 51 self.svc_restore_dict = {} 52 self.sysctl_restore_dict = {} 53 self.tuned_restore_dict = {} 54 self.governor_restore = "" 55 self.tuned_profile = "" 56 57 self.enable_adq = False 58 self.adq_priority = None 59 self.irq_settings = {"mode": "default"} 60 61 if "adq_enable" in server_config and server_config["adq_enable"]: 62 self.enable_adq = server_config["adq_enable"] 63 self.adq_priority = 1 64 if "irq_settings" in server_config: 65 self.irq_settings.update(server_config["irq_settings"]) 66 if "tuned_profile" in server_config: 67 self.tuned_profile = server_config["tuned_profile"] 68 if "irdma_roce_enable" in general_config: 69 self.irdma_roce_enable = general_config["irdma_roce_enable"] 70 71 if not re.match(r'^[A-Za-z0-9\-]+$', name): 72 self.log.info("Please use a name which contains only letters, numbers or dashes") 73 sys.exit(1) 74 75 @staticmethod 76 def get_uncommented_lines(lines): 77 return [line for line in lines if line and not line.startswith('#')] 78 79 def get_nic_name_by_ip(self, ip): 80 if not self._nics_json_obj: 81 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 82 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 83 for nic in self._nics_json_obj: 84 for addr in nic["addr_info"]: 85 if ip in addr["local"]: 86 return nic["ifname"] 87 88 def set_local_nic_info_helper(self): 89 pass 90 91 def set_local_nic_info(self, pci_info): 92 def extract_network_elements(json_obj): 93 nic_list = [] 94 if isinstance(json_obj, list): 95 for x in json_obj: 96 nic_list.extend(extract_network_elements(x)) 97 elif isinstance(json_obj, dict): 98 if "children" in json_obj: 99 nic_list.extend(extract_network_elements(json_obj["children"])) 100 if "class" in json_obj.keys() and "network" in json_obj["class"]: 101 nic_list.append(json_obj) 102 return nic_list 103 104 self.local_nic_info = extract_network_elements(pci_info) 105 106 def get_nic_numa_node(self, nic_name): 107 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 108 109 def get_numa_cpu_map(self): 110 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 111 numa_cpu_json_map = {} 112 113 for cpu in numa_cpu_json_obj["cpus"]: 114 cpu_num = int(cpu["cpu"]) 115 numa_node = int(cpu["node"]) 116 numa_cpu_json_map.setdefault(numa_node, []) 117 numa_cpu_json_map[numa_node].append(cpu_num) 118 119 return numa_cpu_json_map 120 121 # pylint: disable=R0201 122 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 123 return "" 124 125 def configure_system(self): 126 self.load_drivers() 127 self.configure_services() 128 self.configure_sysctl() 129 self.configure_tuned() 130 self.configure_cpu_governor() 131 self.configure_irq_affinity(**self.irq_settings) 132 133 RDMA_PROTOCOL_IWARP = 0 134 RDMA_PROTOCOL_ROCE = 1 135 RDMA_PROTOCOL_UNKNOWN = -1 136 137 def check_rdma_protocol(self): 138 try: 139 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 140 roce_ena = roce_ena.strip() 141 if roce_ena == "0": 142 return self.RDMA_PROTOCOL_IWARP 143 else: 144 return self.RDMA_PROTOCOL_ROCE 145 except CalledProcessError as e: 146 self.log.error("ERROR: failed to check RDMA protocol!") 147 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 148 return self.RDMA_PROTOCOL_UNKNOWN 149 150 def load_drivers(self): 151 self.log.info("Loading drivers") 152 self.exec_cmd(["sudo", "modprobe", "-a", 153 "nvme-%s" % self.transport, 154 "nvmet-%s" % self.transport]) 155 current_mode = self.check_rdma_protocol() 156 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 157 self.log.error("ERROR: failed to check RDMA protocol mode") 158 return 159 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 160 self.reload_driver("irdma", "roce_ena=1") 161 self.log.info("Loaded irdma driver with RoCE enabled") 162 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 163 self.log.info("Leaving irdma driver with RoCE enabled") 164 else: 165 self.reload_driver("irdma", "roce_ena=0") 166 self.log.info("Loaded irdma driver with iWARP enabled") 167 168 def configure_adq(self): 169 self.adq_load_modules() 170 self.adq_configure_nic() 171 172 def adq_load_modules(self): 173 self.log.info("Modprobing ADQ-related Linux modules...") 174 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 175 for module in adq_module_deps: 176 try: 177 self.exec_cmd(["sudo", "modprobe", module]) 178 self.log.info("%s loaded!" % module) 179 except CalledProcessError as e: 180 self.log.error("ERROR: failed to load module %s" % module) 181 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 182 183 def adq_configure_tc(self): 184 self.log.info("Configuring ADQ Traffic classes and filters...") 185 186 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 187 num_queues_tc1 = self.num_cores 188 port_param = "dst_port" if isinstance(self, Target) else "src_port" 189 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 190 191 for nic_ip in self.nic_ips: 192 nic_name = self.get_nic_name_by_ip(nic_ip) 193 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 194 195 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 196 "root", "mqprio", "num_tc", "2", "map", "0", "1", 197 "queues", "%s@0" % num_queues_tc0, 198 "%s@%s" % (num_queues_tc1, num_queues_tc0), 199 "hw", "1", "mode", "channel"] 200 self.log.info(" ".join(tc_qdisc_map_cmd)) 201 self.exec_cmd(tc_qdisc_map_cmd) 202 203 time.sleep(5) 204 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 205 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 206 self.exec_cmd(tc_qdisc_ingress_cmd) 207 208 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 209 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 210 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 211 212 for port in nic_ports: 213 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 214 "protocol", "ip", "ingress", "prio", "1", "flower", 215 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 216 "skip_sw", "hw_tc", "1"] 217 self.log.info(" ".join(tc_filter_cmd)) 218 self.exec_cmd(tc_filter_cmd) 219 220 # show tc configuration 221 self.log.info("Show tc configuration for %s NIC..." % nic_name) 222 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 223 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 224 self.log.info("%s" % tc_disk_out) 225 self.log.info("%s" % tc_filter_out) 226 227 # Ethtool coalesce settings must be applied after configuring traffic classes 228 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 229 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 230 231 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 232 xps_cmd = ["sudo", xps_script_path, nic_name] 233 self.log.info(xps_cmd) 234 self.exec_cmd(xps_cmd) 235 236 def reload_driver(self, driver, *modprobe_args): 237 238 try: 239 self.exec_cmd(["sudo", "rmmod", driver]) 240 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 241 except CalledProcessError as e: 242 self.log.error("ERROR: failed to reload %s module!" % driver) 243 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 244 245 def adq_configure_nic(self): 246 self.log.info("Configuring NIC port settings for ADQ testing...") 247 248 # Reload the driver first, to make sure any previous settings are re-set. 249 self.reload_driver("ice") 250 251 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 252 for nic in nic_names: 253 self.log.info(nic) 254 try: 255 self.exec_cmd(["sudo", "ethtool", "-K", nic, 256 "hw-tc-offload", "on"]) # Enable hardware TC offload 257 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 258 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 259 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 260 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 261 # but can be used with 4k block sizes as well 262 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 263 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 264 except subprocess.CalledProcessError as e: 265 self.log.error("ERROR: failed to configure NIC port using ethtool!") 266 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 267 self.log.info("Please update your NIC driver and firmware versions and try again.") 268 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 269 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 270 271 def configure_services(self): 272 self.log.info("Configuring active services...") 273 svc_config = configparser.ConfigParser(strict=False) 274 275 # Below list is valid only for RHEL / Fedora systems and might not 276 # contain valid names for other distributions. 277 svc_target_state = { 278 "firewalld": "inactive", 279 "irqbalance": "inactive", 280 "lldpad.service": "inactive", 281 "lldpad.socket": "inactive" 282 } 283 284 for service in svc_target_state: 285 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 286 out = "\n".join(["[%s]" % service, out]) 287 svc_config.read_string(out) 288 289 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 290 continue 291 292 service_state = svc_config[service]["ActiveState"] 293 self.log.info("Current state of %s service is %s" % (service, service_state)) 294 self.svc_restore_dict.update({service: service_state}) 295 if service_state != "inactive": 296 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 297 self.exec_cmd(["sudo", "systemctl", "stop", service]) 298 299 def configure_sysctl(self): 300 self.log.info("Tuning sysctl settings...") 301 302 busy_read = 0 303 busy_poll = 0 304 if self.enable_adq and self.mode == "spdk": 305 busy_read = 1 306 busy_poll = 1 307 308 sysctl_opts = { 309 "net.core.busy_poll": busy_poll, 310 "net.core.busy_read": busy_read, 311 "net.core.somaxconn": 4096, 312 "net.core.netdev_max_backlog": 8192, 313 "net.ipv4.tcp_max_syn_backlog": 16384, 314 "net.core.rmem_max": 268435456, 315 "net.core.wmem_max": 268435456, 316 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 317 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 318 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 319 "net.ipv4.route.flush": 1, 320 "vm.overcommit_memory": 1, 321 } 322 323 if self.enable_adq: 324 sysctl_opts.update(self.adq_set_busy_read(1)) 325 326 for opt, value in sysctl_opts.items(): 327 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 328 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 329 330 def configure_tuned(self): 331 if not self.tuned_profile: 332 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 333 return 334 335 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 336 service = "tuned" 337 tuned_config = configparser.ConfigParser(strict=False) 338 339 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 340 out = "\n".join(["[%s]" % service, out]) 341 tuned_config.read_string(out) 342 tuned_state = tuned_config[service]["ActiveState"] 343 self.svc_restore_dict.update({service: tuned_state}) 344 345 if tuned_state != "inactive": 346 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 347 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 348 349 self.tuned_restore_dict = { 350 "profile": profile, 351 "mode": profile_mode 352 } 353 354 self.exec_cmd(["sudo", "systemctl", "start", service]) 355 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 356 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 357 358 def configure_cpu_governor(self): 359 self.log.info("Setting CPU governor to performance...") 360 361 # This assumes that there is the same CPU scaling governor on each CPU 362 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 363 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 364 365 def get_core_list_from_mask(self, core_mask): 366 # Generate list of individual cores from hex core mask 367 # (e.g. '0xffff') or list containing: 368 # - individual cores (e.g. '1, 2, 3') 369 # - core ranges (e.g. '0-3') 370 # - mix of both (e.g. '0, 1-4, 9, 11-13') 371 372 core_list = [] 373 if "0x" in core_mask: 374 core_mask_int = int(core_mask, 16) 375 for i in range(core_mask_int.bit_length()): 376 if (1 << i) & core_mask_int: 377 core_list.append(i) 378 return core_list 379 else: 380 # Core list can be provided in .json config with square brackets 381 # remove them first 382 core_mask = core_mask.replace("[", "") 383 core_mask = core_mask.replace("]", "") 384 385 for i in core_mask.split(","): 386 if "-" in i: 387 start, end = i.split("-") 388 core_range = range(int(start), int(end) + 1) 389 core_list.extend(core_range) 390 else: 391 core_list.append(int(i)) 392 return core_list 393 394 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 395 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 396 397 if mode not in ["default", "bynode", "cpulist"]: 398 raise ValueError("%s irq affinity setting not supported" % mode) 399 400 if mode == "cpulist" and not cpulist: 401 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 402 403 affinity_script = "set_irq_affinity.sh" 404 if "default" not in mode: 405 affinity_script = "set_irq_affinity_cpulist.sh" 406 system_cpu_map = self.get_numa_cpu_map() 407 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 408 409 def cpu_list_to_string(cpulist): 410 return ",".join(map(lambda x: str(x), cpulist)) 411 412 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 413 for nic_name in nic_names: 414 irq_cmd = ["sudo", irq_script_path] 415 416 # Use only CPU cores matching NIC NUMA node. 417 # Remove any CPU cores if they're on exclusion list. 418 if mode == "bynode": 419 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 420 if cpulist and exclude_cpulist: 421 disallowed_cpus = self.get_core_list_from_mask(cpulist) 422 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 423 if not irq_cpus: 424 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 425 irq_cmd.append(cpu_list_to_string(irq_cpus)) 426 427 if mode == "cpulist": 428 irq_cpus = self.get_core_list_from_mask(cpulist) 429 if exclude_cpulist: 430 # Flatten system CPU list, we don't need NUMA awareness here 431 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 432 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 433 if not irq_cpus: 434 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 435 irq_cmd.append(cpu_list_to_string(irq_cpus)) 436 437 irq_cmd.append(nic_name) 438 self.log.info(irq_cmd) 439 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 440 441 def restore_services(self): 442 self.log.info("Restoring services...") 443 for service, state in self.svc_restore_dict.items(): 444 cmd = "stop" if state == "inactive" else "start" 445 self.exec_cmd(["sudo", "systemctl", cmd, service]) 446 447 def restore_sysctl(self): 448 self.log.info("Restoring sysctl settings...") 449 for opt, value in self.sysctl_restore_dict.items(): 450 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 451 452 def restore_tuned(self): 453 self.log.info("Restoring tuned-adm settings...") 454 455 if not self.tuned_restore_dict: 456 return 457 458 if self.tuned_restore_dict["mode"] == "auto": 459 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 460 self.log.info("Reverted tuned-adm to auto_profile.") 461 else: 462 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 463 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 464 465 def restore_governor(self): 466 self.log.info("Restoring CPU governor setting...") 467 if self.governor_restore: 468 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 469 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 470 471 def restore_settings(self): 472 self.restore_governor() 473 self.restore_tuned() 474 self.restore_services() 475 self.restore_sysctl() 476 if self.enable_adq: 477 self.reload_driver("ice") 478 479 480class Target(Server): 481 def __init__(self, name, general_config, target_config): 482 super().__init__(name, general_config, target_config) 483 484 # Defaults 485 self.enable_zcopy = False 486 self.scheduler_name = "static" 487 self.null_block = 0 488 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 489 self.subsystem_info_list = [] 490 self.initiator_info = [] 491 self.nvme_allowlist = [] 492 self.nvme_blocklist = [] 493 494 # Target-side measurement options 495 self.enable_pm = True 496 self.enable_sar = True 497 self.enable_pcm = True 498 self.enable_bw = True 499 self.enable_dpdk_memory = True 500 501 if "null_block_devices" in target_config: 502 self.null_block = target_config["null_block_devices"] 503 if "scheduler_settings" in target_config: 504 self.scheduler_name = target_config["scheduler_settings"] 505 if "zcopy_settings" in target_config: 506 self.enable_zcopy = target_config["zcopy_settings"] 507 if "results_dir" in target_config: 508 self.results_dir = target_config["results_dir"] 509 if "blocklist" in target_config: 510 self.nvme_blocklist = target_config["blocklist"] 511 if "allowlist" in target_config: 512 self.nvme_allowlist = target_config["allowlist"] 513 # Blocklist takes precedence, remove common elements from allowlist 514 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 515 if "enable_pm" in target_config: 516 self.enable_pm = target_config["enable_pm"] 517 if "enable_sar" in target_config: 518 self.enable_sar = target_config["enable_sar"] 519 if "enable_pcm" in target_config: 520 self.enable_pcm = target_config["enable_pcm"] 521 if "enable_bandwidth" in target_config: 522 self.enable_bw = target_config["enable_bandwidth"] 523 if "enable_dpdk_memory" in target_config: 524 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 525 526 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 527 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 528 529 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 530 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 531 self.set_local_nic_info(self.set_local_nic_info_helper()) 532 533 if self.skip_spdk_install is False: 534 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 535 536 self.configure_system() 537 if self.enable_adq: 538 self.configure_adq() 539 self.configure_irq_affinity() 540 self.sys_config() 541 542 def set_local_nic_info_helper(self): 543 return json.loads(self.exec_cmd(["lshw", "-json"])) 544 545 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 546 stderr_opt = None 547 if stderr_redirect: 548 stderr_opt = subprocess.STDOUT 549 if change_dir: 550 old_cwd = os.getcwd() 551 os.chdir(change_dir) 552 self.log.info("Changing directory to %s" % change_dir) 553 554 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 555 556 if change_dir: 557 os.chdir(old_cwd) 558 self.log.info("Changing directory to %s" % old_cwd) 559 return out 560 561 def zip_spdk_sources(self, spdk_dir, dest_file): 562 self.log.info("Zipping SPDK source directory") 563 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 564 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 565 for file in files: 566 fh.write(os.path.relpath(os.path.join(root, file))) 567 fh.close() 568 self.log.info("Done zipping") 569 570 @staticmethod 571 def _chunks(input_list, chunks_no): 572 div, rem = divmod(len(input_list), chunks_no) 573 for i in range(chunks_no): 574 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 575 yield input_list[si:si + (div + 1 if i < rem else div)] 576 577 def spread_bdevs(self, req_disks): 578 # Spread available block devices indexes: 579 # - evenly across available initiator systems 580 # - evenly across available NIC interfaces for 581 # each initiator 582 # Not NUMA aware. 583 ip_bdev_map = [] 584 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 585 586 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 587 self.initiator_info[i]["bdev_range"] = init_chunk 588 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 589 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 590 for c in nic_chunk: 591 ip_bdev_map.append((ip, c)) 592 return ip_bdev_map 593 594 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 595 cpu_number = os.cpu_count() 596 sar_idle_sum = 0 597 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 598 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 599 600 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 601 time.sleep(ramp_time) 602 self.log.info("Starting SAR measurements") 603 604 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 605 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 606 for line in out.split("\n"): 607 if "Average" in line: 608 if "CPU" in line: 609 self.log.info("Summary CPU utilization from SAR:") 610 self.log.info(line) 611 elif "all" in line: 612 self.log.info(line) 613 else: 614 sar_idle_sum += float(line.split()[7]) 615 fh.write(out) 616 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 617 618 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 619 f.write("%0.2f" % sar_cpu_usage) 620 621 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 622 time.sleep(ramp_time) 623 self.log.info("Starting power measurements") 624 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 625 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 626 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 627 628 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 629 time.sleep(ramp_time) 630 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 631 pcm_memory = subprocess.Popen(cmd) 632 time.sleep(run_time) 633 pcm_memory.terminate() 634 635 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 636 time.sleep(ramp_time) 637 cmd = ["pcm", "1", "-i=%s" % run_time, 638 "-csv=%s/%s" % (results_dir, pcm_file_name)] 639 subprocess.run(cmd) 640 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 641 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 642 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 643 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 644 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 645 646 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 647 time.sleep(ramp_time) 648 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 649 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 650 fh.write(out) 651 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 652 # Improve this so that additional file containing measurements average is generated too. 653 654 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 655 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 656 time.sleep(ramp_time) 657 self.log.info("INFO: starting network bandwidth measure") 658 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 659 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 660 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 661 # Improve this so that additional file containing measurements average is generated too. 662 # TODO: Monitor only these interfaces which are currently used to run the workload. 663 664 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 665 self.log.info("INFO: waiting to generate DPDK memory usage") 666 time.sleep(ramp_time) 667 self.log.info("INFO: generating DPDK memory usage") 668 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 669 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 670 671 def sys_config(self): 672 self.log.info("====Kernel release:====") 673 self.log.info(os.uname().release) 674 self.log.info("====Kernel command line:====") 675 with open('/proc/cmdline') as f: 676 cmdline = f.readlines() 677 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 678 self.log.info("====sysctl conf:====") 679 with open('/etc/sysctl.conf') as f: 680 sysctl = f.readlines() 681 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 682 self.log.info("====Cpu power info:====") 683 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 684 self.log.info("====zcopy settings:====") 685 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 686 self.log.info("====Scheduler settings:====") 687 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 688 689 690class Initiator(Server): 691 def __init__(self, name, general_config, initiator_config): 692 super().__init__(name, general_config, initiator_config) 693 694 # Required fields 695 self.ip = initiator_config["ip"] 696 self.target_nic_ips = initiator_config["target_nic_ips"] 697 698 # Defaults 699 self.cpus_allowed = None 700 self.cpus_allowed_policy = "shared" 701 self.spdk_dir = "/tmp/spdk" 702 self.fio_bin = "/usr/src/fio/fio" 703 self.nvmecli_bin = "nvme" 704 self.cpu_frequency = None 705 self.subsystem_info_list = [] 706 self.allow_cpu_sharing = True 707 708 if "spdk_dir" in initiator_config: 709 self.spdk_dir = initiator_config["spdk_dir"] 710 if "fio_bin" in initiator_config: 711 self.fio_bin = initiator_config["fio_bin"] 712 if "nvmecli_bin" in initiator_config: 713 self.nvmecli_bin = initiator_config["nvmecli_bin"] 714 if "cpus_allowed" in initiator_config: 715 self.cpus_allowed = initiator_config["cpus_allowed"] 716 if "cpus_allowed_policy" in initiator_config: 717 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 718 if "cpu_frequency" in initiator_config: 719 self.cpu_frequency = initiator_config["cpu_frequency"] 720 if "allow_cpu_sharing" in initiator_config: 721 self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"] 722 if os.getenv('SPDK_WORKSPACE'): 723 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 724 725 self.ssh_connection = paramiko.SSHClient() 726 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 727 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 728 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 729 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 730 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 731 732 if self.skip_spdk_install is False: 733 self.copy_spdk("/tmp/spdk.zip") 734 735 self.set_local_nic_info(self.set_local_nic_info_helper()) 736 self.set_cpu_frequency() 737 self.configure_system() 738 if self.enable_adq: 739 self.configure_adq() 740 self.sys_config() 741 742 def set_local_nic_info_helper(self): 743 return json.loads(self.exec_cmd(["lshw", "-json"])) 744 745 def stop(self): 746 self.restore_settings() 747 self.ssh_connection.close() 748 749 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 750 if change_dir: 751 cmd = ["cd", change_dir, ";", *cmd] 752 753 # In case one of the command elements contains whitespace and is not 754 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 755 # when sending to remote system. 756 for i, c in enumerate(cmd): 757 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 758 cmd[i] = '"%s"' % c 759 cmd = " ".join(cmd) 760 761 # Redirect stderr to stdout thanks using get_pty option if needed 762 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 763 out = stdout.read().decode(encoding="utf-8") 764 765 # Check the return code 766 rc = stdout.channel.recv_exit_status() 767 if rc: 768 raise CalledProcessError(int(rc), cmd, out) 769 770 return out 771 772 def put_file(self, local, remote_dest): 773 ftp = self.ssh_connection.open_sftp() 774 ftp.put(local, remote_dest) 775 ftp.close() 776 777 def get_file(self, remote, local_dest): 778 ftp = self.ssh_connection.open_sftp() 779 ftp.get(remote, local_dest) 780 ftp.close() 781 782 def copy_spdk(self, local_spdk_zip): 783 self.log.info("Copying SPDK sources to initiator %s" % self.name) 784 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 785 self.log.info("Copied sources zip from target") 786 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 787 self.log.info("Sources unpacked") 788 789 def copy_result_files(self, dest_dir): 790 self.log.info("Copying results") 791 792 if not os.path.exists(dest_dir): 793 os.mkdir(dest_dir) 794 795 # Get list of result files from initiator and copy them back to target 796 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 797 798 for file in file_list: 799 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 800 os.path.join(dest_dir, file)) 801 self.log.info("Done copying results") 802 803 def match_subsystems(self, target_subsytems): 804 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 805 subsystems.sort(key=lambda x: x[1]) 806 self.log.info("Found matching subsystems on target side:") 807 for s in subsystems: 808 self.log.info(s) 809 self.subsystem_info_list = subsystems 810 811 def gen_fio_filename_conf(self, *args, **kwargs): 812 # Logic implemented in SPDKInitiator and KernelInitiator classes 813 pass 814 815 def get_route_nic_numa(self, remote_nvme_ip): 816 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 817 local_nvme_nic = local_nvme_nic[0]["dev"] 818 return self.get_nic_numa_node(local_nvme_nic) 819 820 @staticmethod 821 def gen_fio_offset_section(offset_inc, num_jobs): 822 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 823 return "\n".join(["size=%s%%" % offset_inc, 824 "offset=0%", 825 "offset_increment=%s%%" % offset_inc]) 826 827 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 828 numa_stats = {} 829 allowed_cpus = [] 830 for nvme in fio_filenames_list: 831 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 832 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 833 834 # Use the most common NUMA node for this chunk to allocate memory and CPUs 835 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 836 837 # Check if we have enough free CPUs to pop from the list before assigning them 838 if len(self.available_cpus[section_local_numa]) < num_jobs: 839 if self.allow_cpu_sharing: 840 self.log.info("Regenerating available CPU list %s" % section_local_numa) 841 # Remove still available CPUs from the regenerated list. We don't want to 842 # regenerate it with duplicates. 843 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 844 self.available_cpus[section_local_numa].extend(cpus_regen) 845 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 846 else: 847 self.log.error("No more free CPU cores to use from allowed_cpus list!") 848 raise IndexError 849 850 for _ in range(num_jobs): 851 try: 852 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 853 except IndexError: 854 self.log.error("No more free CPU cores to use from allowed_cpus list!") 855 raise 856 857 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 858 "numa_mem_policy=prefer:%s" % section_local_numa]) 859 860 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 861 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 862 offset=False, offset_inc=0): 863 fio_conf_template = """ 864[global] 865ioengine={ioengine} 866{spdk_conf} 867thread=1 868group_reporting=1 869direct=1 870percentile_list=50:90:99:99.5:99.9:99.99:99.999 871 872norandommap=1 873rw={rw} 874rwmixread={rwmixread} 875bs={block_size} 876time_based=1 877ramp_time={ramp_time} 878runtime={run_time} 879rate_iops={rate_iops} 880""" 881 882 if self.cpus_allowed is not None: 883 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 884 cpus_num = 0 885 cpus = self.cpus_allowed.split(",") 886 for cpu in cpus: 887 if "-" in cpu: 888 a, b = cpu.split("-") 889 a = int(a) 890 b = int(b) 891 cpus_num += len(range(a, b)) 892 else: 893 cpus_num += 1 894 self.num_cores = cpus_num 895 threads = range(0, self.num_cores) 896 elif hasattr(self, 'num_cores'): 897 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 898 threads = range(0, int(self.num_cores)) 899 else: 900 self.num_cores = len(self.subsystem_info_list) 901 threads = range(0, len(self.subsystem_info_list)) 902 903 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 904 offset, offset_inc) 905 906 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 907 rw=rw, rwmixread=rwmixread, block_size=block_size, 908 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 909 910 # TODO: hipri disabled for now, as it causes fio errors: 911 # io_u error on file /dev/nvme2n1: Operation not supported 912 # See comment in KernelInitiator class, init_connect() function 913 if "io_uring" in self.ioengine: 914 fio_config = fio_config + """ 915fixedbufs=1 916registerfiles=1 917#hipri=1 918""" 919 if num_jobs: 920 fio_config = fio_config + "numjobs=%s \n" % num_jobs 921 if self.cpus_allowed is not None: 922 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 923 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 924 fio_config = fio_config + filename_section 925 926 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 927 if hasattr(self, "num_cores"): 928 fio_config_filename += "_%sCPU" % self.num_cores 929 fio_config_filename += ".fio" 930 931 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 932 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 933 self.log.info("Created FIO Config:") 934 self.log.info(fio_config) 935 936 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 937 938 def set_cpu_frequency(self): 939 if self.cpu_frequency is not None: 940 try: 941 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 942 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 943 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 944 except Exception: 945 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 946 sys.exit() 947 else: 948 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 949 950 def run_fio(self, fio_config_file, run_num=1): 951 job_name, _ = os.path.splitext(fio_config_file) 952 self.log.info("Starting FIO run for job: %s" % job_name) 953 self.log.info("Using FIO: %s" % self.fio_bin) 954 955 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 956 try: 957 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 958 "--output=%s" % output_filename, "--eta=never"], True) 959 self.log.info(output) 960 self.log.info("FIO run finished. Results in: %s" % output_filename) 961 except subprocess.CalledProcessError as e: 962 self.log.error("ERROR: Fio process failed!") 963 self.log.error(e.stdout) 964 965 def sys_config(self): 966 self.log.info("====Kernel release:====") 967 self.log.info(self.exec_cmd(["uname", "-r"])) 968 self.log.info("====Kernel command line:====") 969 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 970 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 971 self.log.info("====sysctl conf:====") 972 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 973 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 974 self.log.info("====Cpu power info:====") 975 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 976 977 978class KernelTarget(Target): 979 def __init__(self, name, general_config, target_config): 980 super().__init__(name, general_config, target_config) 981 # Defaults 982 self.nvmet_bin = "nvmetcli" 983 984 if "nvmet_bin" in target_config: 985 self.nvmet_bin = target_config["nvmet_bin"] 986 987 def load_drivers(self): 988 self.log.info("Loading drivers") 989 super().load_drivers() 990 if self.null_block: 991 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 992 993 def configure_adq(self): 994 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 995 996 def adq_configure_tc(self): 997 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 998 999 def adq_set_busy_read(self, busy_read_val): 1000 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1001 return {"net.core.busy_read": 0} 1002 1003 def stop(self): 1004 self.nvmet_command(self.nvmet_bin, "clear") 1005 self.restore_settings() 1006 1007 def get_nvme_device_bdf(self, nvme_dev_path): 1008 nvme_name = os.path.basename(nvme_dev_path) 1009 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1010 1011 def get_nvme_devices(self): 1012 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1013 nvme_list = [] 1014 for dev in dev_list: 1015 if "nvme" not in dev: 1016 continue 1017 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1018 continue 1019 if len(self.nvme_allowlist) == 0: 1020 nvme_list.append(dev) 1021 continue 1022 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1023 nvme_list.append(dev) 1024 return nvme_list 1025 1026 def nvmet_command(self, nvmet_bin, command): 1027 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1028 1029 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1030 1031 nvmet_cfg = { 1032 "ports": [], 1033 "hosts": [], 1034 "subsystems": [], 1035 } 1036 1037 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1038 port = str(4420 + bdev_num) 1039 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1040 serial = "SPDK00%s" % bdev_num 1041 bdev_name = nvme_list[bdev_num] 1042 1043 nvmet_cfg["subsystems"].append({ 1044 "allowed_hosts": [], 1045 "attr": { 1046 "allow_any_host": "1", 1047 "serial": serial, 1048 "version": "1.3" 1049 }, 1050 "namespaces": [ 1051 { 1052 "device": { 1053 "path": bdev_name, 1054 "uuid": "%s" % uuid.uuid4() 1055 }, 1056 "enable": 1, 1057 "nsid": port 1058 } 1059 ], 1060 "nqn": nqn 1061 }) 1062 1063 nvmet_cfg["ports"].append({ 1064 "addr": { 1065 "adrfam": "ipv4", 1066 "traddr": ip, 1067 "trsvcid": port, 1068 "trtype": self.transport 1069 }, 1070 "portid": bdev_num, 1071 "referrals": [], 1072 "subsystems": [nqn] 1073 }) 1074 1075 self.subsystem_info_list.append((port, nqn, ip)) 1076 self.subsys_no = len(self.subsystem_info_list) 1077 1078 with open("kernel.conf", "w") as fh: 1079 fh.write(json.dumps(nvmet_cfg, indent=2)) 1080 1081 def tgt_start(self): 1082 self.log.info("Configuring kernel NVMeOF Target") 1083 1084 if self.null_block: 1085 self.log.info("Configuring with null block device.") 1086 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1087 else: 1088 self.log.info("Configuring with NVMe drives.") 1089 nvme_list = self.get_nvme_devices() 1090 1091 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1092 self.subsys_no = len(nvme_list) 1093 1094 self.nvmet_command(self.nvmet_bin, "clear") 1095 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1096 1097 if self.enable_adq: 1098 self.adq_configure_tc() 1099 1100 self.log.info("Done configuring kernel NVMeOF Target") 1101 1102 1103class SPDKTarget(Target): 1104 def __init__(self, name, general_config, target_config): 1105 # Required fields 1106 self.core_mask = target_config["core_mask"] 1107 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1108 1109 super().__init__(name, general_config, target_config) 1110 1111 # Defaults 1112 self.dif_insert_strip = False 1113 self.null_block_dif_type = 0 1114 self.num_shared_buffers = 4096 1115 self.max_queue_depth = 128 1116 self.bpf_proc = None 1117 self.bpf_scripts = [] 1118 self.enable_dsa = False 1119 self.scheduler_core_limit = None 1120 self.iobuf_small_pool_count = 16383 1121 self.iobuf_large_pool_count = 2047 1122 self.num_cqe = 4096 1123 1124 if "num_shared_buffers" in target_config: 1125 self.num_shared_buffers = target_config["num_shared_buffers"] 1126 if "max_queue_depth" in target_config: 1127 self.max_queue_depth = target_config["max_queue_depth"] 1128 if "null_block_dif_type" in target_config: 1129 self.null_block_dif_type = target_config["null_block_dif_type"] 1130 if "dif_insert_strip" in target_config: 1131 self.dif_insert_strip = target_config["dif_insert_strip"] 1132 if "bpf_scripts" in target_config: 1133 self.bpf_scripts = target_config["bpf_scripts"] 1134 if "dsa_settings" in target_config: 1135 self.enable_dsa = target_config["dsa_settings"] 1136 if "scheduler_core_limit" in target_config: 1137 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1138 if "iobuf_small_pool_count" in target_config: 1139 self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"] 1140 if "iobuf_large_pool_count" in target_config: 1141 self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"] 1142 if "num_cqe" in target_config: 1143 self.num_cqe = target_config["num_cqe"] 1144 1145 self.log.info("====DSA settings:====") 1146 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1147 1148 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1149 if mode not in ["default", "bynode", "cpulist", 1150 "shared", "split", "split-bynode"]: 1151 self.log.error("%s irq affinity setting not supported" % mode) 1152 raise Exception 1153 1154 # Create core list from SPDK's mask and change it to string. 1155 # This is the type configure_irq_affinity expects for cpulist parameter. 1156 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1157 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1158 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1159 1160 if mode == "shared": 1161 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1162 elif mode == "split": 1163 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1164 elif mode == "split-bynode": 1165 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1166 else: 1167 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1168 1169 def adq_set_busy_read(self, busy_read_val): 1170 return {"net.core.busy_read": busy_read_val} 1171 1172 def get_nvme_devices_count(self): 1173 return len(self.get_nvme_devices()) 1174 1175 def get_nvme_devices(self): 1176 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1177 bdev_bdfs = [] 1178 for bdev in bdev_subsys_json_obj["config"]: 1179 bdev_traddr = bdev["params"]["traddr"] 1180 if bdev_traddr in self.nvme_blocklist: 1181 continue 1182 if len(self.nvme_allowlist) == 0: 1183 bdev_bdfs.append(bdev_traddr) 1184 if bdev_traddr in self.nvme_allowlist: 1185 bdev_bdfs.append(bdev_traddr) 1186 return bdev_bdfs 1187 1188 def spdk_tgt_configure(self): 1189 self.log.info("Configuring SPDK NVMeOF target via RPC") 1190 1191 # Create transport layer 1192 nvmf_transport_params = { 1193 "client": self.client, 1194 "trtype": self.transport, 1195 "num_shared_buffers": self.num_shared_buffers, 1196 "max_queue_depth": self.max_queue_depth, 1197 "dif_insert_or_strip": self.dif_insert_strip, 1198 "sock_priority": self.adq_priority, 1199 "num_cqe": self.num_cqe 1200 } 1201 1202 if self.enable_adq: 1203 nvmf_transport_params["acceptor_poll_rate"] = 10000 1204 1205 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1206 self.log.info("SPDK NVMeOF transport layer:") 1207 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1208 1209 if self.null_block: 1210 self.spdk_tgt_add_nullblock(self.null_block) 1211 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1212 else: 1213 self.spdk_tgt_add_nvme_conf() 1214 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1215 1216 if self.enable_adq: 1217 self.adq_configure_tc() 1218 1219 self.log.info("Done configuring SPDK NVMeOF Target") 1220 1221 def spdk_tgt_add_nullblock(self, null_block_count): 1222 md_size = 0 1223 block_size = 4096 1224 if self.null_block_dif_type != 0: 1225 md_size = 128 1226 1227 self.log.info("Adding null block bdevices to config via RPC") 1228 for i in range(null_block_count): 1229 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1230 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1231 dif_type=self.null_block_dif_type, md_size=md_size) 1232 self.log.info("SPDK Bdevs configuration:") 1233 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1234 1235 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1236 self.log.info("Adding NVMe bdevs to config via RPC") 1237 1238 bdfs = self.get_nvme_devices() 1239 bdfs = [b.replace(":", ".") for b in bdfs] 1240 1241 if req_num_disks: 1242 if req_num_disks > len(bdfs): 1243 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1244 sys.exit(1) 1245 else: 1246 bdfs = bdfs[0:req_num_disks] 1247 1248 for i, bdf in enumerate(bdfs): 1249 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1250 1251 self.log.info("SPDK Bdevs configuration:") 1252 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1253 1254 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1255 self.log.info("Adding subsystems to config") 1256 if not req_num_disks: 1257 req_num_disks = self.get_nvme_devices_count() 1258 1259 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1260 port = str(4420 + bdev_num) 1261 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1262 serial = "SPDK00%s" % bdev_num 1263 bdev_name = "Nvme%sn1" % bdev_num 1264 1265 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1266 allow_any_host=True, max_namespaces=8) 1267 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1268 for nqn_name in [nqn, "discovery"]: 1269 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1270 nqn=nqn_name, 1271 trtype=self.transport, 1272 traddr=ip, 1273 trsvcid=port, 1274 adrfam="ipv4") 1275 self.subsystem_info_list.append((port, nqn, ip)) 1276 self.subsys_no = len(self.subsystem_info_list) 1277 1278 self.log.info("SPDK NVMeOF subsystem configuration:") 1279 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1280 1281 def bpf_start(self): 1282 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1283 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1284 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1285 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1286 1287 with open(self.pid, "r") as fh: 1288 nvmf_pid = str(fh.readline()) 1289 1290 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1291 self.log.info(cmd) 1292 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1293 1294 def tgt_start(self): 1295 if self.null_block: 1296 self.subsys_no = 1 1297 else: 1298 self.subsys_no = self.get_nvme_devices_count() 1299 self.log.info("Starting SPDK NVMeOF Target process") 1300 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1301 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1302 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1303 1304 with open(self.pid, "w") as fh: 1305 fh.write(str(proc.pid)) 1306 self.nvmf_proc = proc 1307 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1308 self.log.info("Waiting for spdk to initialize...") 1309 while True: 1310 if os.path.exists("/var/tmp/spdk.sock"): 1311 break 1312 time.sleep(1) 1313 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1314 1315 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1316 rpc.iobuf.iobuf_set_options(self.client, 1317 small_pool_count=self.iobuf_small_pool_count, 1318 large_pool_count=self.iobuf_large_pool_count, 1319 small_bufsize=None, 1320 large_bufsize=None) 1321 1322 if self.enable_zcopy: 1323 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1324 enable_zerocopy_send_server=True) 1325 self.log.info("Target socket options:") 1326 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1327 1328 if self.enable_adq: 1329 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1330 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1331 nvme_adminq_poll_period_us=100000, retry_count=4) 1332 1333 if self.enable_dsa: 1334 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1335 self.log.info("Target DSA accel module enabled") 1336 1337 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1338 rpc.framework_start_init(self.client) 1339 1340 if self.bpf_scripts: 1341 self.bpf_start() 1342 1343 self.spdk_tgt_configure() 1344 1345 def stop(self): 1346 if self.bpf_proc: 1347 self.log.info("Stopping BPF Trace script") 1348 self.bpf_proc.terminate() 1349 self.bpf_proc.wait() 1350 1351 if hasattr(self, "nvmf_proc"): 1352 try: 1353 self.nvmf_proc.terminate() 1354 self.nvmf_proc.wait(timeout=30) 1355 except Exception as e: 1356 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1357 self.log.info(e) 1358 self.nvmf_proc.kill() 1359 self.nvmf_proc.communicate() 1360 # Try to clean up RPC socket files if they were not removed 1361 # because of using 'kill' 1362 try: 1363 os.remove("/var/tmp/spdk.sock") 1364 os.remove("/var/tmp/spdk.sock.lock") 1365 except FileNotFoundError: 1366 pass 1367 self.restore_settings() 1368 1369 1370class KernelInitiator(Initiator): 1371 def __init__(self, name, general_config, initiator_config): 1372 super().__init__(name, general_config, initiator_config) 1373 1374 # Defaults 1375 self.extra_params = "" 1376 self.ioengine = "libaio" 1377 self.spdk_conf = "" 1378 1379 if "num_cores" in initiator_config: 1380 self.num_cores = initiator_config["num_cores"] 1381 1382 if "extra_params" in initiator_config: 1383 self.extra_params = initiator_config["extra_params"] 1384 1385 if "kernel_engine" in initiator_config: 1386 self.ioengine = initiator_config["kernel_engine"] 1387 if "io_uring" in self.ioengine: 1388 self.extra_params += ' --nr-poll-queues=8' 1389 1390 def configure_adq(self): 1391 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1392 1393 def adq_configure_tc(self): 1394 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1395 1396 def adq_set_busy_read(self, busy_read_val): 1397 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1398 return {"net.core.busy_read": 0} 1399 1400 def get_connected_nvme_list(self): 1401 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1402 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1403 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1404 return nvme_list 1405 1406 def init_connect(self): 1407 self.log.info("Below connection attempts may result in error messages, this is expected!") 1408 for subsystem in self.subsystem_info_list: 1409 self.log.info("Trying to connect %s %s %s" % subsystem) 1410 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1411 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1412 time.sleep(2) 1413 1414 if "io_uring" in self.ioengine: 1415 self.log.info("Setting block layer settings for io_uring.") 1416 1417 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1418 # apparently it's not possible for connected subsystems. 1419 # Results in "error: Invalid argument" 1420 block_sysfs_settings = { 1421 "iostats": "0", 1422 "rq_affinity": "0", 1423 "nomerges": "2" 1424 } 1425 1426 for disk in self.get_connected_nvme_list(): 1427 sysfs = os.path.join("/sys/block", disk, "queue") 1428 for k, v in block_sysfs_settings.items(): 1429 sysfs_opt_path = os.path.join(sysfs, k) 1430 try: 1431 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1432 except CalledProcessError as e: 1433 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1434 finally: 1435 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1436 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1437 1438 def init_disconnect(self): 1439 for subsystem in self.subsystem_info_list: 1440 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1441 time.sleep(1) 1442 1443 def get_nvme_subsystem_numa(self, dev_name): 1444 # Remove two last characters to get controller name instead of subsystem name 1445 nvme_ctrl = os.path.basename(dev_name)[:-2] 1446 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1447 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1448 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1449 1450 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1451 self.available_cpus = self.get_numa_cpu_map() 1452 if len(threads) >= len(subsystems): 1453 threads = range(0, len(subsystems)) 1454 1455 # Generate connected nvme devices names and sort them by used NIC numa node 1456 # to allow better grouping when splitting into fio sections. 1457 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1458 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1459 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1460 1461 filename_section = "" 1462 nvme_per_split = int(len(nvme_list) / len(threads)) 1463 remainder = len(nvme_list) % len(threads) 1464 iterator = iter(nvme_list) 1465 result = [] 1466 for i in range(len(threads)): 1467 result.append([]) 1468 for _ in range(nvme_per_split): 1469 result[i].append(next(iterator)) 1470 if remainder: 1471 result[i].append(next(iterator)) 1472 remainder -= 1 1473 for i, r in enumerate(result): 1474 header = "[filename%s]" % i 1475 disks = "\n".join(["filename=%s" % x for x in r]) 1476 job_section_qd = round((io_depth * len(r)) / num_jobs) 1477 if job_section_qd == 0: 1478 job_section_qd = 1 1479 iodepth = "iodepth=%s" % job_section_qd 1480 1481 offset_section = "" 1482 if offset: 1483 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1484 1485 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1486 1487 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1488 1489 return filename_section 1490 1491 1492class SPDKInitiator(Initiator): 1493 def __init__(self, name, general_config, initiator_config): 1494 super().__init__(name, general_config, initiator_config) 1495 1496 if self.skip_spdk_install is False: 1497 self.install_spdk() 1498 1499 # Optional fields 1500 self.enable_data_digest = False 1501 if "enable_data_digest" in initiator_config: 1502 self.enable_data_digest = initiator_config["enable_data_digest"] 1503 if "num_cores" in initiator_config: 1504 self.num_cores = initiator_config["num_cores"] 1505 1506 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1507 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1508 1509 def adq_set_busy_read(self, busy_read_val): 1510 return {"net.core.busy_read": busy_read_val} 1511 1512 def install_spdk(self): 1513 self.log.info("Using fio binary %s" % self.fio_bin) 1514 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1515 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1516 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1517 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1518 "--enable-lto", "--disable-unit-tests"]) 1519 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1520 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1521 1522 self.log.info("SPDK built") 1523 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1524 1525 def init_connect(self): 1526 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1527 # bdev plugin initiates connection just before starting IO traffic. 1528 # This is just to have a "init_connect" equivalent of the same function 1529 # from KernelInitiator class. 1530 # Just prepare bdev.conf JSON file for later use and consider it 1531 # "making a connection". 1532 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1533 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1534 1535 def init_disconnect(self): 1536 # SPDK Initiator does not need to explicity disconnect as this gets done 1537 # after fio bdev plugin finishes IO. 1538 pass 1539 1540 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1541 bdev_cfg_section = { 1542 "subsystems": [ 1543 { 1544 "subsystem": "bdev", 1545 "config": [] 1546 } 1547 ] 1548 } 1549 1550 for i, subsys in enumerate(remote_subsystem_list): 1551 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1552 nvme_ctrl = { 1553 "method": "bdev_nvme_attach_controller", 1554 "params": { 1555 "name": "Nvme{}".format(i), 1556 "trtype": self.transport, 1557 "traddr": sub_addr, 1558 "trsvcid": sub_port, 1559 "subnqn": sub_nqn, 1560 "adrfam": "IPv4" 1561 } 1562 } 1563 1564 if self.enable_adq: 1565 nvme_ctrl["params"].update({"priority": "1"}) 1566 1567 if self.enable_data_digest: 1568 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1569 1570 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1571 1572 return json.dumps(bdev_cfg_section, indent=2) 1573 1574 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1575 self.available_cpus = self.get_numa_cpu_map() 1576 filename_section = "" 1577 if len(threads) >= len(subsystems): 1578 threads = range(0, len(subsystems)) 1579 1580 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1581 # to allow better grouping when splitting into fio sections. 1582 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1583 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1584 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1585 1586 nvme_per_split = int(len(subsystems) / len(threads)) 1587 remainder = len(subsystems) % len(threads) 1588 iterator = iter(filenames) 1589 result = [] 1590 for i in range(len(threads)): 1591 result.append([]) 1592 for _ in range(nvme_per_split): 1593 result[i].append(next(iterator)) 1594 if remainder: 1595 result[i].append(next(iterator)) 1596 remainder -= 1 1597 for i, r in enumerate(result): 1598 header = "[filename%s]" % i 1599 disks = "\n".join(["filename=%s" % x for x in r]) 1600 job_section_qd = round((io_depth * len(r)) / num_jobs) 1601 if job_section_qd == 0: 1602 job_section_qd = 1 1603 iodepth = "iodepth=%s" % job_section_qd 1604 1605 offset_section = "" 1606 if offset: 1607 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1608 1609 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1610 1611 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1612 1613 return filename_section 1614 1615 def get_nvme_subsystem_numa(self, bdev_name): 1616 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1617 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1618 1619 # Remove two last characters to get controller name instead of subsystem name 1620 nvme_ctrl = bdev_name[:-2] 1621 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1622 return self.get_route_nic_numa(remote_nvme_ip) 1623 1624 1625if __name__ == "__main__": 1626 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1627 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1628 1629 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1630 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1631 help='Configuration file.') 1632 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1633 help='Results directory.') 1634 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1635 help='CSV results filename.') 1636 parser.add_argument('-f', '--force', default=False, action='store_true', 1637 dest='force', help="""Force script to continue and try to use all 1638 available NVMe devices during test. 1639 WARNING: Might result in data loss on used NVMe drives""") 1640 1641 args = parser.parse_args() 1642 1643 logging.basicConfig(level=logging.INFO, 1644 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1645 1646 logging.info("Using config file: %s" % args.config) 1647 with open(args.config, "r") as config: 1648 data = json.load(config) 1649 1650 initiators = [] 1651 fio_cases = [] 1652 1653 general_config = data["general"] 1654 target_config = data["target"] 1655 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1656 1657 if "null_block_devices" not in data["target"] and \ 1658 (args.force is False and 1659 "allowlist" not in data["target"] and 1660 "blocklist" not in data["target"]): 1661 # TODO: Also check if allowlist or blocklist are not empty. 1662 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1663 You can choose to use all available NVMe drives on your system, which may potentially 1664 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1665 exit(1) 1666 1667 for k, v in data.items(): 1668 if "target" in k: 1669 v.update({"results_dir": args.results}) 1670 if data[k]["mode"] == "spdk": 1671 target_obj = SPDKTarget(k, data["general"], v) 1672 elif data[k]["mode"] == "kernel": 1673 target_obj = KernelTarget(k, data["general"], v) 1674 elif "initiator" in k: 1675 if data[k]["mode"] == "spdk": 1676 init_obj = SPDKInitiator(k, data["general"], v) 1677 elif data[k]["mode"] == "kernel": 1678 init_obj = KernelInitiator(k, data["general"], v) 1679 initiators.append(init_obj) 1680 elif "fio" in k: 1681 fio_workloads = itertools.product(data[k]["bs"], 1682 data[k]["qd"], 1683 data[k]["rw"]) 1684 1685 fio_run_time = data[k]["run_time"] 1686 fio_ramp_time = data[k]["ramp_time"] 1687 fio_rw_mix_read = data[k]["rwmixread"] 1688 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1689 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1690 1691 fio_rate_iops = 0 1692 if "rate_iops" in data[k]: 1693 fio_rate_iops = data[k]["rate_iops"] 1694 1695 fio_offset = False 1696 if "offset" in data[k]: 1697 fio_offset = data[k]["offset"] 1698 fio_offset_inc = 0 1699 if "offset_inc" in data[k]: 1700 fio_offset_inc = data[k]["offset_inc"] 1701 else: 1702 continue 1703 1704 try: 1705 os.mkdir(args.results) 1706 except FileExistsError: 1707 pass 1708 1709 for i in initiators: 1710 target_obj.initiator_info.append( 1711 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1712 ) 1713 1714 # TODO: This try block is definietly too large. Need to break this up into separate 1715 # logical blocks to reduce size. 1716 try: 1717 target_obj.tgt_start() 1718 1719 for i in initiators: 1720 i.match_subsystems(target_obj.subsystem_info_list) 1721 if i.enable_adq: 1722 i.adq_configure_tc() 1723 1724 # Poor mans threading 1725 # Run FIO tests 1726 for block_size, io_depth, rw in fio_workloads: 1727 configs = [] 1728 for i in initiators: 1729 i.init_connect() 1730 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1731 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1732 fio_offset, fio_offset_inc) 1733 configs.append(cfg) 1734 1735 for run_no in range(1, fio_run_num+1): 1736 threads = [] 1737 power_daemon = None 1738 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1739 1740 for i, cfg in zip(initiators, configs): 1741 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1742 threads.append(t) 1743 if target_obj.enable_sar: 1744 sar_file_prefix = measurements_prefix + "_sar" 1745 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1746 threads.append(t) 1747 1748 if target_obj.enable_pcm: 1749 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1750 1751 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1752 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1753 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1754 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1755 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1756 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1757 1758 threads.append(pcm_cpu_t) 1759 threads.append(pcm_mem_t) 1760 threads.append(pcm_pow_t) 1761 1762 if target_obj.enable_bw: 1763 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1764 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1765 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1766 threads.append(t) 1767 1768 if target_obj.enable_dpdk_memory: 1769 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1770 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1771 threads.append(t) 1772 1773 if target_obj.enable_pm: 1774 power_daemon = threading.Thread(target=target_obj.measure_power, 1775 args=(args.results, measurements_prefix, script_full_dir, 1776 fio_ramp_time, fio_run_time)) 1777 threads.append(power_daemon) 1778 1779 for t in threads: 1780 t.start() 1781 for t in threads: 1782 t.join() 1783 1784 for i in initiators: 1785 i.init_disconnect() 1786 i.copy_result_files(args.results) 1787 try: 1788 parse_results(args.results, args.csv_filename) 1789 except Exception: 1790 logging.error("There was an error with parsing the results") 1791 finally: 1792 for i in initiators: 1793 try: 1794 i.stop() 1795 except Exception as err: 1796 pass 1797 target_obj.stop() 1798