1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.nic_ips = server_config["nic_ips"] 39 self.mode = server_config["mode"] 40 41 self.log = logging.getLogger(self.name) 42 43 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 44 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 45 self.irq_scripts_dir = server_config["irq_scripts_dir"] 46 47 self.local_nic_info = [] 48 self._nics_json_obj = {} 49 self.svc_restore_dict = {} 50 self.sysctl_restore_dict = {} 51 self.tuned_restore_dict = {} 52 self.governor_restore = "" 53 self.tuned_profile = "" 54 55 self.enable_adq = False 56 self.adq_priority = None 57 self.irq_settings = {"mode": "default"} 58 59 if "adq_enable" in server_config and server_config["adq_enable"]: 60 self.enable_adq = server_config["adq_enable"] 61 self.adq_priority = 1 62 if "irq_settings" in server_config: 63 self.irq_settings.update(server_config["irq_settings"]) 64 if "tuned_profile" in server_config: 65 self.tuned_profile = server_config["tuned_profile"] 66 67 if not re.match(r'^[A-Za-z0-9\-]+$', name): 68 self.log.info("Please use a name which contains only letters, numbers or dashes") 69 sys.exit(1) 70 71 @staticmethod 72 def get_uncommented_lines(lines): 73 return [line for line in lines if line and not line.startswith('#')] 74 75 def get_nic_name_by_ip(self, ip): 76 if not self._nics_json_obj: 77 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 78 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 79 for nic in self._nics_json_obj: 80 for addr in nic["addr_info"]: 81 if ip in addr["local"]: 82 return nic["ifname"] 83 84 def set_local_nic_info_helper(self): 85 pass 86 87 def set_local_nic_info(self, pci_info): 88 def extract_network_elements(json_obj): 89 nic_list = [] 90 if isinstance(json_obj, list): 91 for x in json_obj: 92 nic_list.extend(extract_network_elements(x)) 93 elif isinstance(json_obj, dict): 94 if "children" in json_obj: 95 nic_list.extend(extract_network_elements(json_obj["children"])) 96 if "class" in json_obj.keys() and "network" in json_obj["class"]: 97 nic_list.append(json_obj) 98 return nic_list 99 100 self.local_nic_info = extract_network_elements(pci_info) 101 102 def get_nic_numa_node(self, nic_name): 103 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 104 105 def get_numa_cpu_map(self): 106 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 107 numa_cpu_json_map = {} 108 109 for cpu in numa_cpu_json_obj["cpus"]: 110 cpu_num = int(cpu["cpu"]) 111 numa_node = int(cpu["node"]) 112 numa_cpu_json_map.setdefault(numa_node, []) 113 numa_cpu_json_map[numa_node].append(cpu_num) 114 115 return numa_cpu_json_map 116 117 # pylint: disable=R0201 118 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 119 return "" 120 121 def configure_system(self): 122 self.load_drivers() 123 self.configure_services() 124 self.configure_sysctl() 125 self.configure_tuned() 126 self.configure_cpu_governor() 127 self.configure_irq_affinity(**self.irq_settings) 128 129 def load_drivers(self): 130 self.log.info("Loading drivers") 131 self.exec_cmd(["sudo", "modprobe", "-a", 132 "nvme-%s" % self.transport, 133 "nvmet-%s" % self.transport]) 134 135 def configure_adq(self): 136 self.adq_load_modules() 137 self.adq_configure_nic() 138 139 def adq_load_modules(self): 140 self.log.info("Modprobing ADQ-related Linux modules...") 141 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 142 for module in adq_module_deps: 143 try: 144 self.exec_cmd(["sudo", "modprobe", module]) 145 self.log.info("%s loaded!" % module) 146 except CalledProcessError as e: 147 self.log.error("ERROR: failed to load module %s" % module) 148 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 149 150 def adq_configure_tc(self): 151 self.log.info("Configuring ADQ Traffic classes and filters...") 152 153 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 154 num_queues_tc1 = self.num_cores 155 port_param = "dst_port" if isinstance(self, Target) else "src_port" 156 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 157 158 for nic_ip in self.nic_ips: 159 nic_name = self.get_nic_name_by_ip(nic_ip) 160 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 161 162 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 163 "root", "mqprio", "num_tc", "2", "map", "0", "1", 164 "queues", "%s@0" % num_queues_tc0, 165 "%s@%s" % (num_queues_tc1, num_queues_tc0), 166 "hw", "1", "mode", "channel"] 167 self.log.info(" ".join(tc_qdisc_map_cmd)) 168 self.exec_cmd(tc_qdisc_map_cmd) 169 170 time.sleep(5) 171 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 172 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 173 self.exec_cmd(tc_qdisc_ingress_cmd) 174 175 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 176 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 177 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 178 179 for port in nic_ports: 180 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 181 "protocol", "ip", "ingress", "prio", "1", "flower", 182 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 183 "skip_sw", "hw_tc", "1"] 184 self.log.info(" ".join(tc_filter_cmd)) 185 self.exec_cmd(tc_filter_cmd) 186 187 # show tc configuration 188 self.log.info("Show tc configuration for %s NIC..." % nic_name) 189 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 190 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 191 self.log.info("%s" % tc_disk_out) 192 self.log.info("%s" % tc_filter_out) 193 194 # Ethtool coalesce settings must be applied after configuring traffic classes 195 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 196 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 197 198 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 199 xps_cmd = ["sudo", xps_script_path, nic_name] 200 self.log.info(xps_cmd) 201 self.exec_cmd(xps_cmd) 202 203 def reload_driver(self, driver): 204 try: 205 self.exec_cmd(["sudo", "rmmod", driver]) 206 self.exec_cmd(["sudo", "modprobe", driver]) 207 except CalledProcessError as e: 208 self.log.error("ERROR: failed to reload %s module!" % driver) 209 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 210 211 def adq_configure_nic(self): 212 self.log.info("Configuring NIC port settings for ADQ testing...") 213 214 # Reload the driver first, to make sure any previous settings are re-set. 215 self.reload_driver("ice") 216 217 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 218 for nic in nic_names: 219 self.log.info(nic) 220 try: 221 self.exec_cmd(["sudo", "ethtool", "-K", nic, 222 "hw-tc-offload", "on"]) # Enable hardware TC offload 223 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 224 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 225 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 226 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 227 # but can be used with 4k block sizes as well 228 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 229 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 230 except subprocess.CalledProcessError as e: 231 self.log.error("ERROR: failed to configure NIC port using ethtool!") 232 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 233 self.log.info("Please update your NIC driver and firmware versions and try again.") 234 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 235 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 236 237 def configure_services(self): 238 self.log.info("Configuring active services...") 239 svc_config = configparser.ConfigParser(strict=False) 240 241 # Below list is valid only for RHEL / Fedora systems and might not 242 # contain valid names for other distributions. 243 svc_target_state = { 244 "firewalld": "inactive", 245 "irqbalance": "inactive", 246 "lldpad.service": "inactive", 247 "lldpad.socket": "inactive" 248 } 249 250 for service in svc_target_state: 251 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 252 out = "\n".join(["[%s]" % service, out]) 253 svc_config.read_string(out) 254 255 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 256 continue 257 258 service_state = svc_config[service]["ActiveState"] 259 self.log.info("Current state of %s service is %s" % (service, service_state)) 260 self.svc_restore_dict.update({service: service_state}) 261 if service_state != "inactive": 262 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 263 self.exec_cmd(["sudo", "systemctl", "stop", service]) 264 265 def configure_sysctl(self): 266 self.log.info("Tuning sysctl settings...") 267 268 busy_read = 0 269 busy_poll = 0 270 if self.enable_adq and self.mode == "spdk": 271 busy_read = 1 272 busy_poll = 1 273 274 sysctl_opts = { 275 "net.core.busy_poll": busy_poll, 276 "net.core.busy_read": busy_read, 277 "net.core.somaxconn": 4096, 278 "net.core.netdev_max_backlog": 8192, 279 "net.ipv4.tcp_max_syn_backlog": 16384, 280 "net.core.rmem_max": 268435456, 281 "net.core.wmem_max": 268435456, 282 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 283 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 284 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 285 "net.ipv4.route.flush": 1, 286 "vm.overcommit_memory": 1, 287 } 288 289 if self.enable_adq: 290 sysctl_opts.update(self.adq_set_busy_read(1)) 291 292 for opt, value in sysctl_opts.items(): 293 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 294 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 295 296 def configure_tuned(self): 297 if not self.tuned_profile: 298 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 299 return 300 301 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 302 service = "tuned" 303 tuned_config = configparser.ConfigParser(strict=False) 304 305 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 306 out = "\n".join(["[%s]" % service, out]) 307 tuned_config.read_string(out) 308 tuned_state = tuned_config[service]["ActiveState"] 309 self.svc_restore_dict.update({service: tuned_state}) 310 311 if tuned_state != "inactive": 312 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 313 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 314 315 self.tuned_restore_dict = { 316 "profile": profile, 317 "mode": profile_mode 318 } 319 320 self.exec_cmd(["sudo", "systemctl", "start", service]) 321 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 322 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 323 324 def configure_cpu_governor(self): 325 self.log.info("Setting CPU governor to performance...") 326 327 # This assumes that there is the same CPU scaling governor on each CPU 328 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 329 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 330 331 def get_core_list_from_mask(self, core_mask): 332 # Generate list of individual cores from hex core mask 333 # (e.g. '0xffff') or list containing: 334 # - individual cores (e.g. '1, 2, 3') 335 # - core ranges (e.g. '0-3') 336 # - mix of both (e.g. '0, 1-4, 9, 11-13') 337 338 core_list = [] 339 if "0x" in core_mask: 340 core_mask_int = int(core_mask, 16) 341 for i in range(core_mask_int.bit_length()): 342 if (1 << i) & core_mask_int: 343 core_list.append(i) 344 return core_list 345 else: 346 # Core list can be provided in .json config with square brackets 347 # remove them first 348 core_mask = core_mask.replace("[", "") 349 core_mask = core_mask.replace("]", "") 350 351 for i in core_mask.split(","): 352 if "-" in i: 353 start, end = i.split("-") 354 core_range = range(int(start), int(end) + 1) 355 core_list.extend(core_range) 356 else: 357 core_list.append(int(i)) 358 return core_list 359 360 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 361 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 362 363 if mode not in ["default", "bynode", "cpulist"]: 364 raise ValueError("%s irq affinity setting not supported" % mode) 365 366 if mode == "cpulist" and not cpulist: 367 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 368 369 affinity_script = "set_irq_affinity.sh" 370 if "default" not in mode: 371 affinity_script = "set_irq_affinity_cpulist.sh" 372 system_cpu_map = self.get_numa_cpu_map() 373 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 374 375 def cpu_list_to_string(cpulist): 376 return ",".join(map(lambda x: str(x), cpulist)) 377 378 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 379 for nic_name in nic_names: 380 irq_cmd = ["sudo", irq_script_path] 381 382 # Use only CPU cores matching NIC NUMA node. 383 # Remove any CPU cores if they're on exclusion list. 384 if mode == "bynode": 385 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 386 if cpulist and exclude_cpulist: 387 disallowed_cpus = self.get_core_list_from_mask(cpulist) 388 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 389 if not irq_cpus: 390 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 391 irq_cmd.append(cpu_list_to_string(irq_cpus)) 392 393 if mode == "cpulist": 394 irq_cpus = self.get_core_list_from_mask(cpulist) 395 if exclude_cpulist: 396 # Flatten system CPU list, we don't need NUMA awareness here 397 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 398 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 399 if not irq_cpus: 400 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 401 irq_cmd.append(cpu_list_to_string(irq_cpus)) 402 403 irq_cmd.append(nic_name) 404 self.log.info(irq_cmd) 405 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 406 407 def restore_services(self): 408 self.log.info("Restoring services...") 409 for service, state in self.svc_restore_dict.items(): 410 cmd = "stop" if state == "inactive" else "start" 411 self.exec_cmd(["sudo", "systemctl", cmd, service]) 412 413 def restore_sysctl(self): 414 self.log.info("Restoring sysctl settings...") 415 for opt, value in self.sysctl_restore_dict.items(): 416 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 417 418 def restore_tuned(self): 419 self.log.info("Restoring tuned-adm settings...") 420 421 if not self.tuned_restore_dict: 422 return 423 424 if self.tuned_restore_dict["mode"] == "auto": 425 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 426 self.log.info("Reverted tuned-adm to auto_profile.") 427 else: 428 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 429 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 430 431 def restore_governor(self): 432 self.log.info("Restoring CPU governor setting...") 433 if self.governor_restore: 434 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 435 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 436 437 def restore_settings(self): 438 self.restore_governor() 439 self.restore_tuned() 440 self.restore_services() 441 self.restore_sysctl() 442 if self.enable_adq: 443 self.reload_driver("ice") 444 445 446class Target(Server): 447 def __init__(self, name, general_config, target_config): 448 super().__init__(name, general_config, target_config) 449 450 # Defaults 451 self.enable_zcopy = False 452 self.scheduler_name = "static" 453 self.null_block = 0 454 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 455 self.subsystem_info_list = [] 456 self.initiator_info = [] 457 self.nvme_allowlist = [] 458 self.nvme_blocklist = [] 459 460 # Target-side measurement options 461 self.enable_pm = True 462 self.enable_sar = True 463 self.enable_pcm = True 464 self.enable_bw = True 465 self.enable_dpdk_memory = True 466 467 if "null_block_devices" in target_config: 468 self.null_block = target_config["null_block_devices"] 469 if "scheduler_settings" in target_config: 470 self.scheduler_name = target_config["scheduler_settings"] 471 if "zcopy_settings" in target_config: 472 self.enable_zcopy = target_config["zcopy_settings"] 473 if "results_dir" in target_config: 474 self.results_dir = target_config["results_dir"] 475 if "blocklist" in target_config: 476 self.nvme_blocklist = target_config["blocklist"] 477 if "allowlist" in target_config: 478 self.nvme_allowlist = target_config["allowlist"] 479 # Blocklist takes precedence, remove common elements from allowlist 480 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 481 if "enable_pm" in target_config: 482 self.enable_pm = target_config["enable_pm"] 483 if "enable_sar" in target_config: 484 self.enable_sar = target_config["enable_sar"] 485 if "enable_pcm" in target_config: 486 self.enable_pcm = target_config["enable_pcm"] 487 if "enable_bandwidth" in target_config: 488 self.enable_bw = target_config["enable_bandwidth"] 489 if "enable_dpdk_memory" in target_config: 490 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 491 492 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 493 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 494 495 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 496 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 497 self.set_local_nic_info(self.set_local_nic_info_helper()) 498 499 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 500 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 501 502 self.configure_system() 503 if self.enable_adq: 504 self.configure_adq() 505 self.configure_irq_affinity() 506 self.sys_config() 507 508 def set_local_nic_info_helper(self): 509 return json.loads(self.exec_cmd(["lshw", "-json"])) 510 511 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 512 stderr_opt = None 513 if stderr_redirect: 514 stderr_opt = subprocess.STDOUT 515 if change_dir: 516 old_cwd = os.getcwd() 517 os.chdir(change_dir) 518 self.log.info("Changing directory to %s" % change_dir) 519 520 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 521 522 if change_dir: 523 os.chdir(old_cwd) 524 self.log.info("Changing directory to %s" % old_cwd) 525 return out 526 527 def zip_spdk_sources(self, spdk_dir, dest_file): 528 self.log.info("Zipping SPDK source directory") 529 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 530 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 531 for file in files: 532 fh.write(os.path.relpath(os.path.join(root, file))) 533 fh.close() 534 self.log.info("Done zipping") 535 536 @staticmethod 537 def _chunks(input_list, chunks_no): 538 div, rem = divmod(len(input_list), chunks_no) 539 for i in range(chunks_no): 540 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 541 yield input_list[si:si + (div + 1 if i < rem else div)] 542 543 def spread_bdevs(self, req_disks): 544 # Spread available block devices indexes: 545 # - evenly across available initiator systems 546 # - evenly across available NIC interfaces for 547 # each initiator 548 # Not NUMA aware. 549 ip_bdev_map = [] 550 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 551 552 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 553 self.initiator_info[i]["bdev_range"] = init_chunk 554 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 555 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 556 for c in nic_chunk: 557 ip_bdev_map.append((ip, c)) 558 return ip_bdev_map 559 560 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 561 cpu_number = os.cpu_count() 562 sar_idle_sum = 0 563 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 564 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 565 566 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 567 time.sleep(ramp_time) 568 self.log.info("Starting SAR measurements") 569 570 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 571 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 572 for line in out.split("\n"): 573 if "Average" in line: 574 if "CPU" in line: 575 self.log.info("Summary CPU utilization from SAR:") 576 self.log.info(line) 577 elif "all" in line: 578 self.log.info(line) 579 else: 580 sar_idle_sum += float(line.split()[7]) 581 fh.write(out) 582 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 583 584 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 585 f.write("%0.2f" % sar_cpu_usage) 586 587 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 588 time.sleep(ramp_time) 589 self.log.info("Starting power measurements") 590 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 591 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 592 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 593 594 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 595 time.sleep(ramp_time) 596 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 597 pcm_memory = subprocess.Popen(cmd) 598 time.sleep(run_time) 599 pcm_memory.terminate() 600 601 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 602 time.sleep(ramp_time) 603 cmd = ["pcm", "1", "-i=%s" % run_time, 604 "-csv=%s/%s" % (results_dir, pcm_file_name)] 605 subprocess.run(cmd) 606 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 607 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 608 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 609 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 610 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 611 612 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 613 time.sleep(ramp_time) 614 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 615 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 616 fh.write(out) 617 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 618 # Improve this so that additional file containing measurements average is generated too. 619 620 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 621 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 622 time.sleep(ramp_time) 623 self.log.info("INFO: starting network bandwidth measure") 624 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 625 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 626 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 627 # Improve this so that additional file containing measurements average is generated too. 628 # TODO: Monitor only these interfaces which are currently used to run the workload. 629 630 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 631 self.log.info("INFO: waiting to generate DPDK memory usage") 632 time.sleep(ramp_time) 633 self.log.info("INFO: generating DPDK memory usage") 634 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 635 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 636 637 def sys_config(self): 638 self.log.info("====Kernel release:====") 639 self.log.info(os.uname().release) 640 self.log.info("====Kernel command line:====") 641 with open('/proc/cmdline') as f: 642 cmdline = f.readlines() 643 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 644 self.log.info("====sysctl conf:====") 645 with open('/etc/sysctl.conf') as f: 646 sysctl = f.readlines() 647 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 648 self.log.info("====Cpu power info:====") 649 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 650 self.log.info("====zcopy settings:====") 651 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 652 self.log.info("====Scheduler settings:====") 653 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 654 655 656class Initiator(Server): 657 def __init__(self, name, general_config, initiator_config): 658 super().__init__(name, general_config, initiator_config) 659 660 # Required fields 661 self.ip = initiator_config["ip"] 662 self.target_nic_ips = initiator_config["target_nic_ips"] 663 664 # Defaults 665 self.cpus_allowed = None 666 self.cpus_allowed_policy = "shared" 667 self.spdk_dir = "/tmp/spdk" 668 self.fio_bin = "/usr/src/fio/fio" 669 self.nvmecli_bin = "nvme" 670 self.cpu_frequency = None 671 self.subsystem_info_list = [] 672 self.allow_cpu_sharing = True 673 674 if "spdk_dir" in initiator_config: 675 self.spdk_dir = initiator_config["spdk_dir"] 676 if "fio_bin" in initiator_config: 677 self.fio_bin = initiator_config["fio_bin"] 678 if "nvmecli_bin" in initiator_config: 679 self.nvmecli_bin = initiator_config["nvmecli_bin"] 680 if "cpus_allowed" in initiator_config: 681 self.cpus_allowed = initiator_config["cpus_allowed"] 682 if "cpus_allowed_policy" in initiator_config: 683 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 684 if "cpu_frequency" in initiator_config: 685 self.cpu_frequency = initiator_config["cpu_frequency"] 686 if "allow_cpu_sharing" in initiator_config: 687 self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"] 688 if os.getenv('SPDK_WORKSPACE'): 689 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 690 691 self.ssh_connection = paramiko.SSHClient() 692 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 693 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 694 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 695 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 696 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 697 698 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 699 self.copy_spdk("/tmp/spdk.zip") 700 self.set_local_nic_info(self.set_local_nic_info_helper()) 701 self.set_cpu_frequency() 702 self.configure_system() 703 if self.enable_adq: 704 self.configure_adq() 705 self.sys_config() 706 707 def set_local_nic_info_helper(self): 708 return json.loads(self.exec_cmd(["lshw", "-json"])) 709 710 def stop(self): 711 self.restore_settings() 712 self.ssh_connection.close() 713 714 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 715 if change_dir: 716 cmd = ["cd", change_dir, ";", *cmd] 717 718 # In case one of the command elements contains whitespace and is not 719 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 720 # when sending to remote system. 721 for i, c in enumerate(cmd): 722 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 723 cmd[i] = '"%s"' % c 724 cmd = " ".join(cmd) 725 726 # Redirect stderr to stdout thanks using get_pty option if needed 727 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 728 out = stdout.read().decode(encoding="utf-8") 729 730 # Check the return code 731 rc = stdout.channel.recv_exit_status() 732 if rc: 733 raise CalledProcessError(int(rc), cmd, out) 734 735 return out 736 737 def put_file(self, local, remote_dest): 738 ftp = self.ssh_connection.open_sftp() 739 ftp.put(local, remote_dest) 740 ftp.close() 741 742 def get_file(self, remote, local_dest): 743 ftp = self.ssh_connection.open_sftp() 744 ftp.get(remote, local_dest) 745 ftp.close() 746 747 def copy_spdk(self, local_spdk_zip): 748 self.log.info("Copying SPDK sources to initiator %s" % self.name) 749 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 750 self.log.info("Copied sources zip from target") 751 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 752 self.log.info("Sources unpacked") 753 754 def copy_result_files(self, dest_dir): 755 self.log.info("Copying results") 756 757 if not os.path.exists(dest_dir): 758 os.mkdir(dest_dir) 759 760 # Get list of result files from initiator and copy them back to target 761 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 762 763 for file in file_list: 764 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 765 os.path.join(dest_dir, file)) 766 self.log.info("Done copying results") 767 768 def match_subsystems(self, target_subsytems): 769 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 770 subsystems.sort(key=lambda x: x[1]) 771 self.log.info("Found matching subsystems on target side:") 772 for s in subsystems: 773 self.log.info(s) 774 self.subsystem_info_list = subsystems 775 776 def gen_fio_filename_conf(self, *args, **kwargs): 777 # Logic implemented in SPDKInitiator and KernelInitiator classes 778 pass 779 780 def get_route_nic_numa(self, remote_nvme_ip): 781 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 782 local_nvme_nic = local_nvme_nic[0]["dev"] 783 return self.get_nic_numa_node(local_nvme_nic) 784 785 @staticmethod 786 def gen_fio_offset_section(offset_inc, num_jobs): 787 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 788 return "\n".join(["size=%s%%" % offset_inc, 789 "offset=0%", 790 "offset_increment=%s%%" % offset_inc]) 791 792 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 793 numa_stats = {} 794 allowed_cpus = [] 795 for nvme in fio_filenames_list: 796 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 797 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 798 799 # Use the most common NUMA node for this chunk to allocate memory and CPUs 800 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 801 802 # Check if we have enough free CPUs to pop from the list before assigning them 803 if len(self.available_cpus[section_local_numa]) < num_jobs: 804 if self.allow_cpu_sharing: 805 self.log.info("Regenerating available CPU list %s" % section_local_numa) 806 # Remove still available CPUs from the regenerated list. We don't want to 807 # regenerate it with duplicates. 808 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 809 self.available_cpus[section_local_numa].extend(cpus_regen) 810 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 811 else: 812 self.log.error("No more free CPU cores to use from allowed_cpus list!") 813 raise IndexError 814 815 for _ in range(num_jobs): 816 try: 817 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 818 except IndexError: 819 self.log.error("No more free CPU cores to use from allowed_cpus list!") 820 raise 821 822 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 823 "numa_mem_policy=prefer:%s" % section_local_numa]) 824 825 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 826 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 827 offset=False, offset_inc=0): 828 fio_conf_template = """ 829[global] 830ioengine={ioengine} 831{spdk_conf} 832thread=1 833group_reporting=1 834direct=1 835percentile_list=50:90:99:99.5:99.9:99.99:99.999 836 837norandommap=1 838rw={rw} 839rwmixread={rwmixread} 840bs={block_size} 841time_based=1 842ramp_time={ramp_time} 843runtime={run_time} 844rate_iops={rate_iops} 845""" 846 847 if self.cpus_allowed is not None: 848 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 849 cpus_num = 0 850 cpus = self.cpus_allowed.split(",") 851 for cpu in cpus: 852 if "-" in cpu: 853 a, b = cpu.split("-") 854 a = int(a) 855 b = int(b) 856 cpus_num += len(range(a, b)) 857 else: 858 cpus_num += 1 859 self.num_cores = cpus_num 860 threads = range(0, self.num_cores) 861 elif hasattr(self, 'num_cores'): 862 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 863 threads = range(0, int(self.num_cores)) 864 else: 865 self.num_cores = len(self.subsystem_info_list) 866 threads = range(0, len(self.subsystem_info_list)) 867 868 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 869 offset, offset_inc) 870 871 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 872 rw=rw, rwmixread=rwmixread, block_size=block_size, 873 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 874 875 # TODO: hipri disabled for now, as it causes fio errors: 876 # io_u error on file /dev/nvme2n1: Operation not supported 877 # See comment in KernelInitiator class, init_connect() function 878 if "io_uring" in self.ioengine: 879 fio_config = fio_config + """ 880fixedbufs=1 881registerfiles=1 882#hipri=1 883""" 884 if num_jobs: 885 fio_config = fio_config + "numjobs=%s \n" % num_jobs 886 if self.cpus_allowed is not None: 887 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 888 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 889 fio_config = fio_config + filename_section 890 891 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 892 if hasattr(self, "num_cores"): 893 fio_config_filename += "_%sCPU" % self.num_cores 894 fio_config_filename += ".fio" 895 896 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 897 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 898 self.log.info("Created FIO Config:") 899 self.log.info(fio_config) 900 901 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 902 903 def set_cpu_frequency(self): 904 if self.cpu_frequency is not None: 905 try: 906 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 907 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 908 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 909 except Exception: 910 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 911 sys.exit() 912 else: 913 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 914 915 def run_fio(self, fio_config_file, run_num=1): 916 job_name, _ = os.path.splitext(fio_config_file) 917 self.log.info("Starting FIO run for job: %s" % job_name) 918 self.log.info("Using FIO: %s" % self.fio_bin) 919 920 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 921 try: 922 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 923 "--output=%s" % output_filename, "--eta=never"], True) 924 self.log.info(output) 925 self.log.info("FIO run finished. Results in: %s" % output_filename) 926 except subprocess.CalledProcessError as e: 927 self.log.error("ERROR: Fio process failed!") 928 self.log.error(e.stdout) 929 930 def sys_config(self): 931 self.log.info("====Kernel release:====") 932 self.log.info(self.exec_cmd(["uname", "-r"])) 933 self.log.info("====Kernel command line:====") 934 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 935 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 936 self.log.info("====sysctl conf:====") 937 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 938 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 939 self.log.info("====Cpu power info:====") 940 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 941 942 943class KernelTarget(Target): 944 def __init__(self, name, general_config, target_config): 945 super().__init__(name, general_config, target_config) 946 # Defaults 947 self.nvmet_bin = "nvmetcli" 948 949 if "nvmet_bin" in target_config: 950 self.nvmet_bin = target_config["nvmet_bin"] 951 952 def load_drivers(self): 953 self.log.info("Loading drivers") 954 super().load_drivers() 955 if self.null_block: 956 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 957 958 def configure_adq(self): 959 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 960 961 def adq_configure_tc(self): 962 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 963 964 def adq_set_busy_read(self, busy_read_val): 965 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 966 return {"net.core.busy_read": 0} 967 968 def stop(self): 969 self.nvmet_command(self.nvmet_bin, "clear") 970 self.restore_settings() 971 972 def get_nvme_device_bdf(self, nvme_dev_path): 973 nvme_name = os.path.basename(nvme_dev_path) 974 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 975 976 def get_nvme_devices(self): 977 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 978 nvme_list = [] 979 for dev in dev_list: 980 if "nvme" not in dev: 981 continue 982 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 983 continue 984 if len(self.nvme_allowlist) == 0: 985 nvme_list.append(dev) 986 continue 987 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 988 nvme_list.append(dev) 989 return nvme_list 990 991 def nvmet_command(self, nvmet_bin, command): 992 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 993 994 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 995 996 nvmet_cfg = { 997 "ports": [], 998 "hosts": [], 999 "subsystems": [], 1000 } 1001 1002 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1003 port = str(4420 + bdev_num) 1004 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1005 serial = "SPDK00%s" % bdev_num 1006 bdev_name = nvme_list[bdev_num] 1007 1008 nvmet_cfg["subsystems"].append({ 1009 "allowed_hosts": [], 1010 "attr": { 1011 "allow_any_host": "1", 1012 "serial": serial, 1013 "version": "1.3" 1014 }, 1015 "namespaces": [ 1016 { 1017 "device": { 1018 "path": bdev_name, 1019 "uuid": "%s" % uuid.uuid4() 1020 }, 1021 "enable": 1, 1022 "nsid": port 1023 } 1024 ], 1025 "nqn": nqn 1026 }) 1027 1028 nvmet_cfg["ports"].append({ 1029 "addr": { 1030 "adrfam": "ipv4", 1031 "traddr": ip, 1032 "trsvcid": port, 1033 "trtype": self.transport 1034 }, 1035 "portid": bdev_num, 1036 "referrals": [], 1037 "subsystems": [nqn] 1038 }) 1039 1040 self.subsystem_info_list.append((port, nqn, ip)) 1041 self.subsys_no = len(self.subsystem_info_list) 1042 1043 with open("kernel.conf", "w") as fh: 1044 fh.write(json.dumps(nvmet_cfg, indent=2)) 1045 1046 def tgt_start(self): 1047 self.log.info("Configuring kernel NVMeOF Target") 1048 1049 if self.null_block: 1050 self.log.info("Configuring with null block device.") 1051 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1052 else: 1053 self.log.info("Configuring with NVMe drives.") 1054 nvme_list = self.get_nvme_devices() 1055 1056 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1057 self.subsys_no = len(nvme_list) 1058 1059 self.nvmet_command(self.nvmet_bin, "clear") 1060 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1061 1062 if self.enable_adq: 1063 self.adq_configure_tc() 1064 1065 self.log.info("Done configuring kernel NVMeOF Target") 1066 1067 1068class SPDKTarget(Target): 1069 def __init__(self, name, general_config, target_config): 1070 # Required fields 1071 self.core_mask = target_config["core_mask"] 1072 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1073 1074 super().__init__(name, general_config, target_config) 1075 1076 # Defaults 1077 self.dif_insert_strip = False 1078 self.null_block_dif_type = 0 1079 self.num_shared_buffers = 4096 1080 self.max_queue_depth = 128 1081 self.bpf_proc = None 1082 self.bpf_scripts = [] 1083 self.enable_dsa = False 1084 self.scheduler_core_limit = None 1085 self.iobuf_small_pool_count = 16383 1086 self.iobuf_large_pool_count = 2047 1087 self.num_cqe = 4096 1088 1089 if "num_shared_buffers" in target_config: 1090 self.num_shared_buffers = target_config["num_shared_buffers"] 1091 if "max_queue_depth" in target_config: 1092 self.max_queue_depth = target_config["max_queue_depth"] 1093 if "null_block_dif_type" in target_config: 1094 self.null_block_dif_type = target_config["null_block_dif_type"] 1095 if "dif_insert_strip" in target_config: 1096 self.dif_insert_strip = target_config["dif_insert_strip"] 1097 if "bpf_scripts" in target_config: 1098 self.bpf_scripts = target_config["bpf_scripts"] 1099 if "dsa_settings" in target_config: 1100 self.enable_dsa = target_config["dsa_settings"] 1101 if "scheduler_core_limit" in target_config: 1102 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1103 if "iobuf_small_pool_count" in target_config: 1104 self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"] 1105 if "iobuf_large_pool_count" in target_config: 1106 self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"] 1107 if "num_cqe" in target_config: 1108 self.num_cqe = target_config["num_cqe"] 1109 1110 self.log.info("====DSA settings:====") 1111 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1112 1113 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1114 if mode not in ["default", "bynode", "cpulist", 1115 "shared", "split", "split-bynode"]: 1116 self.log.error("%s irq affinity setting not supported" % mode) 1117 raise Exception 1118 1119 # Create core list from SPDK's mask and change it to string. 1120 # This is the type configure_irq_affinity expects for cpulist parameter. 1121 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1122 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1123 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1124 1125 if mode == "shared": 1126 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1127 elif mode == "split": 1128 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1129 elif mode == "split-bynode": 1130 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1131 else: 1132 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1133 1134 def adq_set_busy_read(self, busy_read_val): 1135 return {"net.core.busy_read": busy_read_val} 1136 1137 def get_nvme_devices_count(self): 1138 return len(self.get_nvme_devices()) 1139 1140 def get_nvme_devices(self): 1141 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1142 bdev_bdfs = [] 1143 for bdev in bdev_subsys_json_obj["config"]: 1144 bdev_traddr = bdev["params"]["traddr"] 1145 if bdev_traddr in self.nvme_blocklist: 1146 continue 1147 if len(self.nvme_allowlist) == 0: 1148 bdev_bdfs.append(bdev_traddr) 1149 if bdev_traddr in self.nvme_allowlist: 1150 bdev_bdfs.append(bdev_traddr) 1151 return bdev_bdfs 1152 1153 def spdk_tgt_configure(self): 1154 self.log.info("Configuring SPDK NVMeOF target via RPC") 1155 1156 # Create transport layer 1157 nvmf_transport_params = { 1158 "client": self.client, 1159 "trtype": self.transport, 1160 "num_shared_buffers": self.num_shared_buffers, 1161 "max_queue_depth": self.max_queue_depth, 1162 "dif_insert_or_strip": self.dif_insert_strip, 1163 "sock_priority": self.adq_priority, 1164 "num_cqe": self.num_cqe 1165 } 1166 1167 if self.enable_adq: 1168 nvmf_transport_params["acceptor_poll_rate"] = 10000 1169 1170 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1171 self.log.info("SPDK NVMeOF transport layer:") 1172 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1173 1174 if self.null_block: 1175 self.spdk_tgt_add_nullblock(self.null_block) 1176 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1177 else: 1178 self.spdk_tgt_add_nvme_conf() 1179 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1180 1181 if self.enable_adq: 1182 self.adq_configure_tc() 1183 1184 self.log.info("Done configuring SPDK NVMeOF Target") 1185 1186 def spdk_tgt_add_nullblock(self, null_block_count): 1187 md_size = 0 1188 block_size = 4096 1189 if self.null_block_dif_type != 0: 1190 md_size = 128 1191 1192 self.log.info("Adding null block bdevices to config via RPC") 1193 for i in range(null_block_count): 1194 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1195 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1196 dif_type=self.null_block_dif_type, md_size=md_size) 1197 self.log.info("SPDK Bdevs configuration:") 1198 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1199 1200 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1201 self.log.info("Adding NVMe bdevs to config via RPC") 1202 1203 bdfs = self.get_nvme_devices() 1204 bdfs = [b.replace(":", ".") for b in bdfs] 1205 1206 if req_num_disks: 1207 if req_num_disks > len(bdfs): 1208 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1209 sys.exit(1) 1210 else: 1211 bdfs = bdfs[0:req_num_disks] 1212 1213 for i, bdf in enumerate(bdfs): 1214 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1215 1216 self.log.info("SPDK Bdevs configuration:") 1217 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1218 1219 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1220 self.log.info("Adding subsystems to config") 1221 if not req_num_disks: 1222 req_num_disks = self.get_nvme_devices_count() 1223 1224 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1225 port = str(4420 + bdev_num) 1226 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1227 serial = "SPDK00%s" % bdev_num 1228 bdev_name = "Nvme%sn1" % bdev_num 1229 1230 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1231 allow_any_host=True, max_namespaces=8) 1232 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1233 for nqn_name in [nqn, "discovery"]: 1234 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1235 nqn=nqn_name, 1236 trtype=self.transport, 1237 traddr=ip, 1238 trsvcid=port, 1239 adrfam="ipv4") 1240 self.subsystem_info_list.append((port, nqn, ip)) 1241 self.subsys_no = len(self.subsystem_info_list) 1242 1243 self.log.info("SPDK NVMeOF subsystem configuration:") 1244 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1245 1246 def bpf_start(self): 1247 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1248 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1249 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1250 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1251 1252 with open(self.pid, "r") as fh: 1253 nvmf_pid = str(fh.readline()) 1254 1255 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1256 self.log.info(cmd) 1257 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1258 1259 def tgt_start(self): 1260 if self.null_block: 1261 self.subsys_no = 1 1262 else: 1263 self.subsys_no = self.get_nvme_devices_count() 1264 self.log.info("Starting SPDK NVMeOF Target process") 1265 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1266 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1267 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1268 1269 with open(self.pid, "w") as fh: 1270 fh.write(str(proc.pid)) 1271 self.nvmf_proc = proc 1272 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1273 self.log.info("Waiting for spdk to initialize...") 1274 while True: 1275 if os.path.exists("/var/tmp/spdk.sock"): 1276 break 1277 time.sleep(1) 1278 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1279 1280 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1281 rpc.iobuf.iobuf_set_options(self.client, 1282 small_pool_count=self.iobuf_small_pool_count, 1283 large_pool_count=self.iobuf_large_pool_count, 1284 small_bufsize=None, 1285 large_bufsize=None) 1286 1287 if self.enable_zcopy: 1288 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1289 enable_zerocopy_send_server=True) 1290 self.log.info("Target socket options:") 1291 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1292 1293 if self.enable_adq: 1294 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1295 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1296 nvme_adminq_poll_period_us=100000, retry_count=4) 1297 1298 if self.enable_dsa: 1299 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1300 self.log.info("Target DSA accel module enabled") 1301 1302 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1303 rpc.framework_start_init(self.client) 1304 1305 if self.bpf_scripts: 1306 self.bpf_start() 1307 1308 self.spdk_tgt_configure() 1309 1310 def stop(self): 1311 if self.bpf_proc: 1312 self.log.info("Stopping BPF Trace script") 1313 self.bpf_proc.terminate() 1314 self.bpf_proc.wait() 1315 1316 if hasattr(self, "nvmf_proc"): 1317 try: 1318 self.nvmf_proc.terminate() 1319 self.nvmf_proc.wait(timeout=30) 1320 except Exception as e: 1321 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1322 self.log.info(e) 1323 self.nvmf_proc.kill() 1324 self.nvmf_proc.communicate() 1325 # Try to clean up RPC socket files if they were not removed 1326 # because of using 'kill' 1327 try: 1328 os.remove("/var/tmp/spdk.sock") 1329 os.remove("/var/tmp/spdk.sock.lock") 1330 except FileNotFoundError: 1331 pass 1332 self.restore_settings() 1333 1334 1335class KernelInitiator(Initiator): 1336 def __init__(self, name, general_config, initiator_config): 1337 super().__init__(name, general_config, initiator_config) 1338 1339 # Defaults 1340 self.extra_params = "" 1341 self.ioengine = "libaio" 1342 self.spdk_conf = "" 1343 1344 if "num_cores" in initiator_config: 1345 self.num_cores = initiator_config["num_cores"] 1346 1347 if "extra_params" in initiator_config: 1348 self.extra_params = initiator_config["extra_params"] 1349 1350 if "kernel_engine" in initiator_config: 1351 self.ioengine = initiator_config["kernel_engine"] 1352 if "io_uring" in self.ioengine: 1353 self.extra_params += ' --nr-poll-queues=8' 1354 1355 def configure_adq(self): 1356 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1357 1358 def adq_configure_tc(self): 1359 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1360 1361 def adq_set_busy_read(self, busy_read_val): 1362 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1363 return {"net.core.busy_read": 0} 1364 1365 def get_connected_nvme_list(self): 1366 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1367 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1368 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1369 return nvme_list 1370 1371 def init_connect(self): 1372 self.log.info("Below connection attempts may result in error messages, this is expected!") 1373 for subsystem in self.subsystem_info_list: 1374 self.log.info("Trying to connect %s %s %s" % subsystem) 1375 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1376 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1377 time.sleep(2) 1378 1379 if "io_uring" in self.ioengine: 1380 self.log.info("Setting block layer settings for io_uring.") 1381 1382 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1383 # apparently it's not possible for connected subsystems. 1384 # Results in "error: Invalid argument" 1385 block_sysfs_settings = { 1386 "iostats": "0", 1387 "rq_affinity": "0", 1388 "nomerges": "2" 1389 } 1390 1391 for disk in self.get_connected_nvme_list(): 1392 sysfs = os.path.join("/sys/block", disk, "queue") 1393 for k, v in block_sysfs_settings.items(): 1394 sysfs_opt_path = os.path.join(sysfs, k) 1395 try: 1396 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1397 except CalledProcessError as e: 1398 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1399 finally: 1400 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1401 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1402 1403 def init_disconnect(self): 1404 for subsystem in self.subsystem_info_list: 1405 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1406 time.sleep(1) 1407 1408 def get_nvme_subsystem_numa(self, dev_name): 1409 # Remove two last characters to get controller name instead of subsystem name 1410 nvme_ctrl = os.path.basename(dev_name)[:-2] 1411 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1412 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1413 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1414 1415 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1416 self.available_cpus = self.get_numa_cpu_map() 1417 if len(threads) >= len(subsystems): 1418 threads = range(0, len(subsystems)) 1419 1420 # Generate connected nvme devices names and sort them by used NIC numa node 1421 # to allow better grouping when splitting into fio sections. 1422 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1423 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1424 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1425 1426 filename_section = "" 1427 nvme_per_split = int(len(nvme_list) / len(threads)) 1428 remainder = len(nvme_list) % len(threads) 1429 iterator = iter(nvme_list) 1430 result = [] 1431 for i in range(len(threads)): 1432 result.append([]) 1433 for _ in range(nvme_per_split): 1434 result[i].append(next(iterator)) 1435 if remainder: 1436 result[i].append(next(iterator)) 1437 remainder -= 1 1438 for i, r in enumerate(result): 1439 header = "[filename%s]" % i 1440 disks = "\n".join(["filename=%s" % x for x in r]) 1441 job_section_qd = round((io_depth * len(r)) / num_jobs) 1442 if job_section_qd == 0: 1443 job_section_qd = 1 1444 iodepth = "iodepth=%s" % job_section_qd 1445 1446 offset_section = "" 1447 if offset: 1448 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1449 1450 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1451 1452 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1453 1454 return filename_section 1455 1456 1457class SPDKInitiator(Initiator): 1458 def __init__(self, name, general_config, initiator_config): 1459 super().__init__(name, general_config, initiator_config) 1460 1461 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1462 self.install_spdk() 1463 1464 # Optional fields 1465 self.enable_data_digest = False 1466 if "enable_data_digest" in initiator_config: 1467 self.enable_data_digest = initiator_config["enable_data_digest"] 1468 if "num_cores" in initiator_config: 1469 self.num_cores = initiator_config["num_cores"] 1470 1471 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1472 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1473 1474 def adq_set_busy_read(self, busy_read_val): 1475 return {"net.core.busy_read": busy_read_val} 1476 1477 def install_spdk(self): 1478 self.log.info("Using fio binary %s" % self.fio_bin) 1479 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1480 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1481 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1482 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1483 "--enable-lto", "--disable-unit-tests"]) 1484 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1485 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1486 1487 self.log.info("SPDK built") 1488 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1489 1490 def init_connect(self): 1491 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1492 # bdev plugin initiates connection just before starting IO traffic. 1493 # This is just to have a "init_connect" equivalent of the same function 1494 # from KernelInitiator class. 1495 # Just prepare bdev.conf JSON file for later use and consider it 1496 # "making a connection". 1497 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1498 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1499 1500 def init_disconnect(self): 1501 # SPDK Initiator does not need to explicity disconnect as this gets done 1502 # after fio bdev plugin finishes IO. 1503 pass 1504 1505 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1506 bdev_cfg_section = { 1507 "subsystems": [ 1508 { 1509 "subsystem": "bdev", 1510 "config": [] 1511 } 1512 ] 1513 } 1514 1515 for i, subsys in enumerate(remote_subsystem_list): 1516 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1517 nvme_ctrl = { 1518 "method": "bdev_nvme_attach_controller", 1519 "params": { 1520 "name": "Nvme{}".format(i), 1521 "trtype": self.transport, 1522 "traddr": sub_addr, 1523 "trsvcid": sub_port, 1524 "subnqn": sub_nqn, 1525 "adrfam": "IPv4" 1526 } 1527 } 1528 1529 if self.enable_adq: 1530 nvme_ctrl["params"].update({"priority": "1"}) 1531 1532 if self.enable_data_digest: 1533 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1534 1535 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1536 1537 return json.dumps(bdev_cfg_section, indent=2) 1538 1539 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1540 self.available_cpus = self.get_numa_cpu_map() 1541 filename_section = "" 1542 if len(threads) >= len(subsystems): 1543 threads = range(0, len(subsystems)) 1544 1545 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1546 # to allow better grouping when splitting into fio sections. 1547 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1548 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1549 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1550 1551 nvme_per_split = int(len(subsystems) / len(threads)) 1552 remainder = len(subsystems) % len(threads) 1553 iterator = iter(filenames) 1554 result = [] 1555 for i in range(len(threads)): 1556 result.append([]) 1557 for _ in range(nvme_per_split): 1558 result[i].append(next(iterator)) 1559 if remainder: 1560 result[i].append(next(iterator)) 1561 remainder -= 1 1562 for i, r in enumerate(result): 1563 header = "[filename%s]" % i 1564 disks = "\n".join(["filename=%s" % x for x in r]) 1565 job_section_qd = round((io_depth * len(r)) / num_jobs) 1566 if job_section_qd == 0: 1567 job_section_qd = 1 1568 iodepth = "iodepth=%s" % job_section_qd 1569 1570 offset_section = "" 1571 if offset: 1572 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1573 1574 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1575 1576 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1577 1578 return filename_section 1579 1580 def get_nvme_subsystem_numa(self, bdev_name): 1581 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1582 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1583 1584 # Remove two last characters to get controller name instead of subsystem name 1585 nvme_ctrl = bdev_name[:-2] 1586 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1587 return self.get_route_nic_numa(remote_nvme_ip) 1588 1589 1590if __name__ == "__main__": 1591 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1592 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1593 1594 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1595 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1596 help='Configuration file.') 1597 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1598 help='Results directory.') 1599 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1600 help='CSV results filename.') 1601 parser.add_argument('-f', '--force', default=False, action='store_true', 1602 dest='force', help="""Force script to continue and try to use all 1603 available NVMe devices during test. 1604 WARNING: Might result in data loss on used NVMe drives""") 1605 1606 args = parser.parse_args() 1607 1608 logging.basicConfig(level=logging.INFO, 1609 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1610 1611 logging.info("Using config file: %s" % args.config) 1612 with open(args.config, "r") as config: 1613 data = json.load(config) 1614 1615 initiators = [] 1616 fio_cases = [] 1617 1618 general_config = data["general"] 1619 target_config = data["target"] 1620 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1621 1622 if "null_block_devices" not in data["target"] and \ 1623 (args.force is False and 1624 "allowlist" not in data["target"] and 1625 "blocklist" not in data["target"]): 1626 # TODO: Also check if allowlist or blocklist are not empty. 1627 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1628 You can choose to use all available NVMe drives on your system, which may potentially 1629 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1630 exit(1) 1631 1632 for k, v in data.items(): 1633 if "target" in k: 1634 v.update({"results_dir": args.results}) 1635 if data[k]["mode"] == "spdk": 1636 target_obj = SPDKTarget(k, data["general"], v) 1637 elif data[k]["mode"] == "kernel": 1638 target_obj = KernelTarget(k, data["general"], v) 1639 elif "initiator" in k: 1640 if data[k]["mode"] == "spdk": 1641 init_obj = SPDKInitiator(k, data["general"], v) 1642 elif data[k]["mode"] == "kernel": 1643 init_obj = KernelInitiator(k, data["general"], v) 1644 initiators.append(init_obj) 1645 elif "fio" in k: 1646 fio_workloads = itertools.product(data[k]["bs"], 1647 data[k]["qd"], 1648 data[k]["rw"]) 1649 1650 fio_run_time = data[k]["run_time"] 1651 fio_ramp_time = data[k]["ramp_time"] 1652 fio_rw_mix_read = data[k]["rwmixread"] 1653 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1654 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1655 1656 fio_rate_iops = 0 1657 if "rate_iops" in data[k]: 1658 fio_rate_iops = data[k]["rate_iops"] 1659 1660 fio_offset = False 1661 if "offset" in data[k]: 1662 fio_offset = data[k]["offset"] 1663 fio_offset_inc = 0 1664 if "offset_inc" in data[k]: 1665 fio_offset_inc = data[k]["offset_inc"] 1666 else: 1667 continue 1668 1669 try: 1670 os.mkdir(args.results) 1671 except FileExistsError: 1672 pass 1673 1674 for i in initiators: 1675 target_obj.initiator_info.append( 1676 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1677 ) 1678 1679 # TODO: This try block is definietly too large. Need to break this up into separate 1680 # logical blocks to reduce size. 1681 try: 1682 target_obj.tgt_start() 1683 1684 for i in initiators: 1685 i.match_subsystems(target_obj.subsystem_info_list) 1686 if i.enable_adq: 1687 i.adq_configure_tc() 1688 1689 # Poor mans threading 1690 # Run FIO tests 1691 for block_size, io_depth, rw in fio_workloads: 1692 configs = [] 1693 for i in initiators: 1694 i.init_connect() 1695 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1696 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1697 fio_offset, fio_offset_inc) 1698 configs.append(cfg) 1699 1700 for run_no in range(1, fio_run_num+1): 1701 threads = [] 1702 power_daemon = None 1703 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1704 1705 for i, cfg in zip(initiators, configs): 1706 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1707 threads.append(t) 1708 if target_obj.enable_sar: 1709 sar_file_prefix = measurements_prefix + "_sar" 1710 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1711 threads.append(t) 1712 1713 if target_obj.enable_pcm: 1714 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1715 1716 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1717 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1718 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1719 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1720 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1721 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1722 1723 threads.append(pcm_cpu_t) 1724 threads.append(pcm_mem_t) 1725 threads.append(pcm_pow_t) 1726 1727 if target_obj.enable_bw: 1728 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1729 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1730 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1731 threads.append(t) 1732 1733 if target_obj.enable_dpdk_memory: 1734 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1735 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1736 threads.append(t) 1737 1738 if target_obj.enable_pm: 1739 power_daemon = threading.Thread(target=target_obj.measure_power, 1740 args=(args.results, measurements_prefix, script_full_dir, 1741 fio_ramp_time, fio_run_time)) 1742 threads.append(power_daemon) 1743 1744 for t in threads: 1745 t.start() 1746 for t in threads: 1747 t.join() 1748 1749 for i in initiators: 1750 i.init_disconnect() 1751 i.copy_result_files(args.results) 1752 try: 1753 parse_results(args.results, args.csv_filename) 1754 except Exception: 1755 logging.error("There was an error with parsing the results") 1756 finally: 1757 for i in initiators: 1758 try: 1759 i.stop() 1760 except Exception as err: 1761 pass 1762 target_obj.stop() 1763