1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.nic_ips = server_config["nic_ips"] 39 self.mode = server_config["mode"] 40 41 self.log = logging.getLogger(self.name) 42 43 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 44 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 45 self.irq_scripts_dir = server_config["irq_scripts_dir"] 46 47 self.local_nic_info = [] 48 self._nics_json_obj = {} 49 self.svc_restore_dict = {} 50 self.sysctl_restore_dict = {} 51 self.tuned_restore_dict = {} 52 self.governor_restore = "" 53 self.tuned_profile = "" 54 55 self.enable_adq = False 56 self.adq_priority = None 57 self.irq_settings = {"mode": "default"} 58 59 if "adq_enable" in server_config and server_config["adq_enable"]: 60 self.enable_adq = server_config["adq_enable"] 61 self.adq_priority = 1 62 if "irq_settings" in server_config: 63 self.irq_settings.update(server_config["irq_settings"]) 64 if "tuned_profile" in server_config: 65 self.tuned_profile = server_config["tuned_profile"] 66 67 if not re.match(r'^[A-Za-z0-9\-]+$', name): 68 self.log.info("Please use a name which contains only letters, numbers or dashes") 69 sys.exit(1) 70 71 @staticmethod 72 def get_uncommented_lines(lines): 73 return [line for line in lines if line and not line.startswith('#')] 74 75 def get_nic_name_by_ip(self, ip): 76 if not self._nics_json_obj: 77 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 78 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 79 for nic in self._nics_json_obj: 80 for addr in nic["addr_info"]: 81 if ip in addr["local"]: 82 return nic["ifname"] 83 84 def set_local_nic_info_helper(self): 85 pass 86 87 def set_local_nic_info(self, pci_info): 88 def extract_network_elements(json_obj): 89 nic_list = [] 90 if isinstance(json_obj, list): 91 for x in json_obj: 92 nic_list.extend(extract_network_elements(x)) 93 elif isinstance(json_obj, dict): 94 if "children" in json_obj: 95 nic_list.extend(extract_network_elements(json_obj["children"])) 96 if "class" in json_obj.keys() and "network" in json_obj["class"]: 97 nic_list.append(json_obj) 98 return nic_list 99 100 self.local_nic_info = extract_network_elements(pci_info) 101 102 def get_nic_numa_node(self, nic_name): 103 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 104 105 def get_numa_cpu_map(self): 106 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 107 numa_cpu_json_map = {} 108 109 for cpu in numa_cpu_json_obj["cpus"]: 110 cpu_num = int(cpu["cpu"]) 111 numa_node = int(cpu["node"]) 112 numa_cpu_json_map.setdefault(numa_node, []) 113 numa_cpu_json_map[numa_node].append(cpu_num) 114 115 return numa_cpu_json_map 116 117 # pylint: disable=R0201 118 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 119 return "" 120 121 def configure_system(self): 122 self.load_drivers() 123 self.configure_services() 124 self.configure_sysctl() 125 self.configure_tuned() 126 self.configure_cpu_governor() 127 self.configure_irq_affinity(**self.irq_settings) 128 129 def load_drivers(self): 130 self.log.info("Loading drivers") 131 self.exec_cmd(["sudo", "modprobe", "-a", 132 "nvme-%s" % self.transport, 133 "nvmet-%s" % self.transport]) 134 135 def configure_adq(self): 136 self.adq_load_modules() 137 self.adq_configure_nic() 138 139 def adq_load_modules(self): 140 self.log.info("Modprobing ADQ-related Linux modules...") 141 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 142 for module in adq_module_deps: 143 try: 144 self.exec_cmd(["sudo", "modprobe", module]) 145 self.log.info("%s loaded!" % module) 146 except CalledProcessError as e: 147 self.log.error("ERROR: failed to load module %s" % module) 148 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 149 150 def adq_configure_tc(self): 151 self.log.info("Configuring ADQ Traffic classes and filters...") 152 153 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 154 num_queues_tc1 = self.num_cores 155 port_param = "dst_port" if isinstance(self, Target) else "src_port" 156 port = "4420" 157 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 158 159 for nic_ip in self.nic_ips: 160 nic_name = self.get_nic_name_by_ip(nic_ip) 161 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 162 "root", "mqprio", "num_tc", "2", "map", "0", "1", 163 "queues", "%s@0" % num_queues_tc0, 164 "%s@%s" % (num_queues_tc1, num_queues_tc0), 165 "hw", "1", "mode", "channel"] 166 self.log.info(" ".join(tc_qdisc_map_cmd)) 167 self.exec_cmd(tc_qdisc_map_cmd) 168 169 time.sleep(5) 170 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 171 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 172 self.exec_cmd(tc_qdisc_ingress_cmd) 173 174 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 175 "protocol", "ip", "ingress", "prio", "1", "flower", 176 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 177 "skip_sw", "hw_tc", "1"] 178 self.log.info(" ".join(tc_filter_cmd)) 179 self.exec_cmd(tc_filter_cmd) 180 181 # show tc configuration 182 self.log.info("Show tc configuration for %s NIC..." % nic_name) 183 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 184 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 185 self.log.info("%s" % tc_disk_out) 186 self.log.info("%s" % tc_filter_out) 187 188 # Ethtool coalesce settings must be applied after configuring traffic classes 189 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 190 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 191 192 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 193 xps_cmd = ["sudo", xps_script_path, nic_name] 194 self.log.info(xps_cmd) 195 self.exec_cmd(xps_cmd) 196 197 def reload_driver(self, driver): 198 try: 199 self.exec_cmd(["sudo", "rmmod", driver]) 200 self.exec_cmd(["sudo", "modprobe", driver]) 201 except CalledProcessError as e: 202 self.log.error("ERROR: failed to reload %s module!" % driver) 203 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 204 205 def adq_configure_nic(self): 206 self.log.info("Configuring NIC port settings for ADQ testing...") 207 208 # Reload the driver first, to make sure any previous settings are re-set. 209 self.reload_driver("ice") 210 211 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 212 for nic in nic_names: 213 self.log.info(nic) 214 try: 215 self.exec_cmd(["sudo", "ethtool", "-K", nic, 216 "hw-tc-offload", "on"]) # Enable hardware TC offload 217 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 218 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 219 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 220 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 221 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 222 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 223 "channel-pkt-inspect-optimize", "on"]) 224 except CalledProcessError as e: 225 self.log.error("ERROR: failed to configure NIC port using ethtool!") 226 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 227 self.log.info("Please update your NIC driver and firmware versions and try again.") 228 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 229 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 230 231 def configure_services(self): 232 self.log.info("Configuring active services...") 233 svc_config = configparser.ConfigParser(strict=False) 234 235 # Below list is valid only for RHEL / Fedora systems and might not 236 # contain valid names for other distributions. 237 svc_target_state = { 238 "firewalld": "inactive", 239 "irqbalance": "inactive", 240 "lldpad.service": "inactive", 241 "lldpad.socket": "inactive" 242 } 243 244 for service in svc_target_state: 245 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 246 out = "\n".join(["[%s]" % service, out]) 247 svc_config.read_string(out) 248 249 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 250 continue 251 252 service_state = svc_config[service]["ActiveState"] 253 self.log.info("Current state of %s service is %s" % (service, service_state)) 254 self.svc_restore_dict.update({service: service_state}) 255 if service_state != "inactive": 256 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 257 self.exec_cmd(["sudo", "systemctl", "stop", service]) 258 259 def configure_sysctl(self): 260 self.log.info("Tuning sysctl settings...") 261 262 sysctl_opts = { 263 "net.core.busy_poll": 0, 264 "net.core.busy_read": 0, 265 "net.core.somaxconn": 4096, 266 "net.core.netdev_max_backlog": 8192, 267 "net.ipv4.tcp_max_syn_backlog": 16384, 268 "net.core.rmem_max": 268435456, 269 "net.core.wmem_max": 268435456, 270 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 271 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 272 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 273 "net.ipv4.route.flush": 1, 274 "vm.overcommit_memory": 1, 275 } 276 277 if self.enable_adq: 278 sysctl_opts.update(self.adq_set_busy_read(1)) 279 280 for opt, value in sysctl_opts.items(): 281 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 282 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 283 284 def configure_tuned(self): 285 if not self.tuned_profile: 286 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 287 return 288 289 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 290 service = "tuned" 291 tuned_config = configparser.ConfigParser(strict=False) 292 293 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 294 out = "\n".join(["[%s]" % service, out]) 295 tuned_config.read_string(out) 296 tuned_state = tuned_config[service]["ActiveState"] 297 self.svc_restore_dict.update({service: tuned_state}) 298 299 if tuned_state != "inactive": 300 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 301 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 302 303 self.tuned_restore_dict = { 304 "profile": profile, 305 "mode": profile_mode 306 } 307 308 self.exec_cmd(["sudo", "systemctl", "start", service]) 309 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 310 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 311 312 def configure_cpu_governor(self): 313 self.log.info("Setting CPU governor to performance...") 314 315 # This assumes that there is the same CPU scaling governor on each CPU 316 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 317 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 318 319 def get_core_list_from_mask(self, core_mask): 320 # Generate list of individual cores from hex core mask 321 # (e.g. '0xffff') or list containing: 322 # - individual cores (e.g. '1, 2, 3') 323 # - core ranges (e.g. '0-3') 324 # - mix of both (e.g. '0, 1-4, 9, 11-13') 325 326 core_list = [] 327 if "0x" in core_mask: 328 core_mask_int = int(core_mask, 16) 329 for i in range(core_mask_int.bit_length()): 330 if (1 << i) & core_mask_int: 331 core_list.append(i) 332 return core_list 333 else: 334 # Core list can be provided in .json config with square brackets 335 # remove them first 336 core_mask = core_mask.replace("[", "") 337 core_mask = core_mask.replace("]", "") 338 339 for i in core_mask.split(","): 340 if "-" in i: 341 start, end = i.split("-") 342 core_range = range(int(start), int(end) + 1) 343 core_list.extend(core_range) 344 else: 345 core_list.append(int(i)) 346 return core_list 347 348 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 349 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 350 351 if mode not in ["default", "bynode", "cpulist"]: 352 raise ValueError("%s irq affinity setting not supported" % mode) 353 354 if mode == "cpulist" and not cpulist: 355 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 356 357 affinity_script = "set_irq_affinity.sh" 358 if "default" not in mode: 359 affinity_script = "set_irq_affinity_cpulist.sh" 360 system_cpu_map = self.get_numa_cpu_map() 361 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 362 363 def cpu_list_to_string(cpulist): 364 return ",".join(map(lambda x: str(x), cpulist)) 365 366 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 367 for nic_name in nic_names: 368 irq_cmd = ["sudo", irq_script_path] 369 370 # Use only CPU cores matching NIC NUMA node. 371 # Remove any CPU cores if they're on exclusion list. 372 if mode == "bynode": 373 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 374 if cpulist and exclude_cpulist: 375 disallowed_cpus = self.get_core_list_from_mask(cpulist) 376 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 377 if not irq_cpus: 378 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 379 irq_cmd.append(cpu_list_to_string(irq_cpus)) 380 381 if mode == "cpulist": 382 irq_cpus = self.get_core_list_from_mask(cpulist) 383 if exclude_cpulist: 384 # Flatten system CPU list, we don't need NUMA awareness here 385 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 386 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 387 if not irq_cpus: 388 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 389 irq_cmd.append(cpu_list_to_string(irq_cpus)) 390 391 irq_cmd.append(nic_name) 392 self.log.info(irq_cmd) 393 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 394 395 def restore_services(self): 396 self.log.info("Restoring services...") 397 for service, state in self.svc_restore_dict.items(): 398 cmd = "stop" if state == "inactive" else "start" 399 self.exec_cmd(["sudo", "systemctl", cmd, service]) 400 401 def restore_sysctl(self): 402 self.log.info("Restoring sysctl settings...") 403 for opt, value in self.sysctl_restore_dict.items(): 404 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 405 406 def restore_tuned(self): 407 self.log.info("Restoring tuned-adm settings...") 408 409 if not self.tuned_restore_dict: 410 return 411 412 if self.tuned_restore_dict["mode"] == "auto": 413 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 414 self.log.info("Reverted tuned-adm to auto_profile.") 415 else: 416 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 417 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 418 419 def restore_governor(self): 420 self.log.info("Restoring CPU governor setting...") 421 if self.governor_restore: 422 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 423 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 424 425 def restore_settings(self): 426 self.restore_governor() 427 self.restore_tuned() 428 self.restore_services() 429 self.restore_sysctl() 430 if self.enable_adq: 431 self.reload_driver("ice") 432 433 434class Target(Server): 435 def __init__(self, name, general_config, target_config): 436 super().__init__(name, general_config, target_config) 437 438 # Defaults 439 self.enable_zcopy = False 440 self.scheduler_name = "static" 441 self.null_block = 0 442 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 443 self.subsystem_info_list = [] 444 self.initiator_info = [] 445 self.nvme_allowlist = [] 446 self.nvme_blocklist = [] 447 448 # Target-side measurement options 449 self.enable_pm = True 450 self.enable_sar = True 451 self.enable_pcm = True 452 self.enable_bw = True 453 self.enable_dpdk_memory = True 454 455 if "null_block_devices" in target_config: 456 self.null_block = target_config["null_block_devices"] 457 if "scheduler_settings" in target_config: 458 self.scheduler_name = target_config["scheduler_settings"] 459 if "zcopy_settings" in target_config: 460 self.enable_zcopy = target_config["zcopy_settings"] 461 if "results_dir" in target_config: 462 self.results_dir = target_config["results_dir"] 463 if "blocklist" in target_config: 464 self.nvme_blocklist = target_config["blocklist"] 465 if "allowlist" in target_config: 466 self.nvme_allowlist = target_config["allowlist"] 467 # Blocklist takes precedence, remove common elements from allowlist 468 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 469 if "enable_pm" in target_config: 470 self.enable_pm = target_config["enable_pm"] 471 if "enable_sar" in target_config: 472 self.enable_sar = target_config["enable_sar"] 473 if "enable_pcm" in target_config: 474 self.enable_pcm = target_config["enable_pcm"] 475 if "enable_bandwidth" in target_config: 476 self.enable_bw = target_config["enable_bandwidth"] 477 if "enable_dpdk_memory" in target_config: 478 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 479 480 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 481 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 482 483 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 484 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 485 self.set_local_nic_info(self.set_local_nic_info_helper()) 486 487 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 488 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 489 490 self.configure_system() 491 if self.enable_adq: 492 self.configure_adq() 493 self.sys_config() 494 495 def set_local_nic_info_helper(self): 496 return json.loads(self.exec_cmd(["lshw", "-json"])) 497 498 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 499 stderr_opt = None 500 if stderr_redirect: 501 stderr_opt = subprocess.STDOUT 502 if change_dir: 503 old_cwd = os.getcwd() 504 os.chdir(change_dir) 505 self.log.info("Changing directory to %s" % change_dir) 506 507 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 508 509 if change_dir: 510 os.chdir(old_cwd) 511 self.log.info("Changing directory to %s" % old_cwd) 512 return out 513 514 def zip_spdk_sources(self, spdk_dir, dest_file): 515 self.log.info("Zipping SPDK source directory") 516 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 517 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 518 for file in files: 519 fh.write(os.path.relpath(os.path.join(root, file))) 520 fh.close() 521 self.log.info("Done zipping") 522 523 @staticmethod 524 def _chunks(input_list, chunks_no): 525 div, rem = divmod(len(input_list), chunks_no) 526 for i in range(chunks_no): 527 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 528 yield input_list[si:si + (div + 1 if i < rem else div)] 529 530 def spread_bdevs(self, req_disks): 531 # Spread available block devices indexes: 532 # - evenly across available initiator systems 533 # - evenly across available NIC interfaces for 534 # each initiator 535 # Not NUMA aware. 536 ip_bdev_map = [] 537 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 538 539 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 540 self.initiator_info[i]["bdev_range"] = init_chunk 541 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 542 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 543 for c in nic_chunk: 544 ip_bdev_map.append((ip, c)) 545 return ip_bdev_map 546 547 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 548 cpu_number = os.cpu_count() 549 sar_idle_sum = 0 550 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 551 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 552 553 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 554 time.sleep(ramp_time) 555 self.log.info("Starting SAR measurements") 556 557 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 558 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 559 for line in out.split("\n"): 560 if "Average" in line: 561 if "CPU" in line: 562 self.log.info("Summary CPU utilization from SAR:") 563 self.log.info(line) 564 elif "all" in line: 565 self.log.info(line) 566 else: 567 sar_idle_sum += float(line.split()[7]) 568 fh.write(out) 569 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 570 571 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 572 f.write("%0.2f" % sar_cpu_usage) 573 574 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 575 time.sleep(ramp_time) 576 self.log.info("Starting power measurements") 577 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 578 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 579 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 580 581 def ethtool_after_fio_ramp(self, fio_ramp_time): 582 time.sleep(fio_ramp_time//2) 583 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 584 for nic in nic_names: 585 self.log.info(nic) 586 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 587 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 588 589 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 590 time.sleep(ramp_time) 591 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 592 pcm_memory = subprocess.Popen(cmd) 593 time.sleep(run_time) 594 pcm_memory.terminate() 595 596 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 597 time.sleep(ramp_time) 598 cmd = ["pcm", "1", "-i=%s" % run_time, 599 "-csv=%s/%s" % (results_dir, pcm_file_name)] 600 subprocess.run(cmd) 601 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 602 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 603 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 604 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 605 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 606 607 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 608 time.sleep(ramp_time) 609 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 610 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 611 fh.write(out) 612 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 613 # Improve this so that additional file containing measurements average is generated too. 614 615 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 616 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 617 time.sleep(ramp_time) 618 self.log.info("INFO: starting network bandwidth measure") 619 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 620 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 621 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 622 # Improve this so that additional file containing measurements average is generated too. 623 # TODO: Monitor only these interfaces which are currently used to run the workload. 624 625 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 626 self.log.info("INFO: waiting to generate DPDK memory usage") 627 time.sleep(ramp_time) 628 self.log.info("INFO: generating DPDK memory usage") 629 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 630 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 631 632 def sys_config(self): 633 self.log.info("====Kernel release:====") 634 self.log.info(os.uname().release) 635 self.log.info("====Kernel command line:====") 636 with open('/proc/cmdline') as f: 637 cmdline = f.readlines() 638 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 639 self.log.info("====sysctl conf:====") 640 with open('/etc/sysctl.conf') as f: 641 sysctl = f.readlines() 642 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 643 self.log.info("====Cpu power info:====") 644 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 645 self.log.info("====zcopy settings:====") 646 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 647 self.log.info("====Scheduler settings:====") 648 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 649 650 651class Initiator(Server): 652 def __init__(self, name, general_config, initiator_config): 653 super().__init__(name, general_config, initiator_config) 654 655 # Required fields 656 self.ip = initiator_config["ip"] 657 self.target_nic_ips = initiator_config["target_nic_ips"] 658 659 # Defaults 660 self.cpus_allowed = None 661 self.cpus_allowed_policy = "shared" 662 self.spdk_dir = "/tmp/spdk" 663 self.fio_bin = "/usr/src/fio/fio" 664 self.nvmecli_bin = "nvme" 665 self.cpu_frequency = None 666 self.subsystem_info_list = [] 667 self.allow_cpu_sharing = True 668 669 if "spdk_dir" in initiator_config: 670 self.spdk_dir = initiator_config["spdk_dir"] 671 if "fio_bin" in initiator_config: 672 self.fio_bin = initiator_config["fio_bin"] 673 if "nvmecli_bin" in initiator_config: 674 self.nvmecli_bin = initiator_config["nvmecli_bin"] 675 if "cpus_allowed" in initiator_config: 676 self.cpus_allowed = initiator_config["cpus_allowed"] 677 if "cpus_allowed_policy" in initiator_config: 678 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 679 if "cpu_frequency" in initiator_config: 680 self.cpu_frequency = initiator_config["cpu_frequency"] 681 if "allow_cpu_sharing" in initiator_config: 682 self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"] 683 if os.getenv('SPDK_WORKSPACE'): 684 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 685 686 self.ssh_connection = paramiko.SSHClient() 687 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 688 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 689 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 690 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 691 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 692 693 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 694 self.copy_spdk("/tmp/spdk.zip") 695 self.set_local_nic_info(self.set_local_nic_info_helper()) 696 self.set_cpu_frequency() 697 self.configure_system() 698 if self.enable_adq: 699 self.configure_adq() 700 self.sys_config() 701 702 def set_local_nic_info_helper(self): 703 return json.loads(self.exec_cmd(["lshw", "-json"])) 704 705 def stop(self): 706 self.restore_settings() 707 self.ssh_connection.close() 708 709 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 710 if change_dir: 711 cmd = ["cd", change_dir, ";", *cmd] 712 713 # In case one of the command elements contains whitespace and is not 714 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 715 # when sending to remote system. 716 for i, c in enumerate(cmd): 717 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 718 cmd[i] = '"%s"' % c 719 cmd = " ".join(cmd) 720 721 # Redirect stderr to stdout thanks using get_pty option if needed 722 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 723 out = stdout.read().decode(encoding="utf-8") 724 725 # Check the return code 726 rc = stdout.channel.recv_exit_status() 727 if rc: 728 raise CalledProcessError(int(rc), cmd, out) 729 730 return out 731 732 def put_file(self, local, remote_dest): 733 ftp = self.ssh_connection.open_sftp() 734 ftp.put(local, remote_dest) 735 ftp.close() 736 737 def get_file(self, remote, local_dest): 738 ftp = self.ssh_connection.open_sftp() 739 ftp.get(remote, local_dest) 740 ftp.close() 741 742 def copy_spdk(self, local_spdk_zip): 743 self.log.info("Copying SPDK sources to initiator %s" % self.name) 744 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 745 self.log.info("Copied sources zip from target") 746 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 747 self.log.info("Sources unpacked") 748 749 def copy_result_files(self, dest_dir): 750 self.log.info("Copying results") 751 752 if not os.path.exists(dest_dir): 753 os.mkdir(dest_dir) 754 755 # Get list of result files from initiator and copy them back to target 756 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 757 758 for file in file_list: 759 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 760 os.path.join(dest_dir, file)) 761 self.log.info("Done copying results") 762 763 def match_subsystems(self, target_subsytems): 764 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 765 subsystems.sort(key=lambda x: x[1]) 766 self.log.info("Found matching subsystems on target side:") 767 for s in subsystems: 768 self.log.info(s) 769 self.subsystem_info_list = subsystems 770 771 def gen_fio_filename_conf(self, *args, **kwargs): 772 # Logic implemented in SPDKInitiator and KernelInitiator classes 773 pass 774 775 def get_route_nic_numa(self, remote_nvme_ip): 776 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 777 local_nvme_nic = local_nvme_nic[0]["dev"] 778 return self.get_nic_numa_node(local_nvme_nic) 779 780 @staticmethod 781 def gen_fio_offset_section(offset_inc, num_jobs): 782 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 783 return "\n".join(["size=%s%%" % offset_inc, 784 "offset=0%", 785 "offset_increment=%s%%" % offset_inc]) 786 787 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 788 numa_stats = {} 789 allowed_cpus = [] 790 for nvme in fio_filenames_list: 791 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 792 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 793 794 # Use the most common NUMA node for this chunk to allocate memory and CPUs 795 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 796 797 # Check if we have enough free CPUs to pop from the list before assigning them 798 if len(self.available_cpus[section_local_numa]) < num_jobs: 799 if self.allow_cpu_sharing: 800 self.log.info("Regenerating available CPU list %s" % section_local_numa) 801 # Remove still available CPUs from the regenerated list. We don't want to 802 # regenerate it with duplicates. 803 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 804 self.available_cpus[section_local_numa].extend(cpus_regen) 805 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 806 else: 807 self.log.error("No more free CPU cores to use from allowed_cpus list!") 808 raise IndexError 809 810 for _ in range(num_jobs): 811 try: 812 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 813 except IndexError: 814 self.log.error("No more free CPU cores to use from allowed_cpus list!") 815 raise 816 817 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 818 "numa_mem_policy=prefer:%s" % section_local_numa]) 819 820 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 821 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 822 offset=False, offset_inc=0): 823 fio_conf_template = """ 824[global] 825ioengine={ioengine} 826{spdk_conf} 827thread=1 828group_reporting=1 829direct=1 830percentile_list=50:90:99:99.5:99.9:99.99:99.999 831 832norandommap=1 833rw={rw} 834rwmixread={rwmixread} 835bs={block_size} 836time_based=1 837ramp_time={ramp_time} 838runtime={run_time} 839rate_iops={rate_iops} 840""" 841 842 if self.cpus_allowed is not None: 843 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 844 cpus_num = 0 845 cpus = self.cpus_allowed.split(",") 846 for cpu in cpus: 847 if "-" in cpu: 848 a, b = cpu.split("-") 849 a = int(a) 850 b = int(b) 851 cpus_num += len(range(a, b)) 852 else: 853 cpus_num += 1 854 self.num_cores = cpus_num 855 threads = range(0, self.num_cores) 856 elif hasattr(self, 'num_cores'): 857 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 858 threads = range(0, int(self.num_cores)) 859 else: 860 self.num_cores = len(self.subsystem_info_list) 861 threads = range(0, len(self.subsystem_info_list)) 862 863 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 864 offset, offset_inc) 865 866 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 867 rw=rw, rwmixread=rwmixread, block_size=block_size, 868 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 869 870 # TODO: hipri disabled for now, as it causes fio errors: 871 # io_u error on file /dev/nvme2n1: Operation not supported 872 # See comment in KernelInitiator class, init_connect() function 873 if "io_uring" in self.ioengine: 874 fio_config = fio_config + """ 875fixedbufs=1 876registerfiles=1 877#hipri=1 878""" 879 if num_jobs: 880 fio_config = fio_config + "numjobs=%s \n" % num_jobs 881 if self.cpus_allowed is not None: 882 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 883 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 884 fio_config = fio_config + filename_section 885 886 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 887 if hasattr(self, "num_cores"): 888 fio_config_filename += "_%sCPU" % self.num_cores 889 fio_config_filename += ".fio" 890 891 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 892 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 893 self.log.info("Created FIO Config:") 894 self.log.info(fio_config) 895 896 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 897 898 def set_cpu_frequency(self): 899 if self.cpu_frequency is not None: 900 try: 901 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 902 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 903 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 904 except Exception: 905 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 906 sys.exit() 907 else: 908 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 909 910 def run_fio(self, fio_config_file, run_num=1): 911 job_name, _ = os.path.splitext(fio_config_file) 912 self.log.info("Starting FIO run for job: %s" % job_name) 913 self.log.info("Using FIO: %s" % self.fio_bin) 914 915 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 916 try: 917 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 918 "--output=%s" % output_filename, "--eta=never"], True) 919 self.log.info(output) 920 self.log.info("FIO run finished. Results in: %s" % output_filename) 921 except subprocess.CalledProcessError as e: 922 self.log.error("ERROR: Fio process failed!") 923 self.log.error(e.stdout) 924 925 def sys_config(self): 926 self.log.info("====Kernel release:====") 927 self.log.info(self.exec_cmd(["uname", "-r"])) 928 self.log.info("====Kernel command line:====") 929 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 930 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 931 self.log.info("====sysctl conf:====") 932 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 933 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 934 self.log.info("====Cpu power info:====") 935 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 936 937 938class KernelTarget(Target): 939 def __init__(self, name, general_config, target_config): 940 super().__init__(name, general_config, target_config) 941 # Defaults 942 self.nvmet_bin = "nvmetcli" 943 944 if "nvmet_bin" in target_config: 945 self.nvmet_bin = target_config["nvmet_bin"] 946 947 def load_drivers(self): 948 self.log.info("Loading drivers") 949 super().load_drivers() 950 if self.null_block: 951 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 952 953 def configure_adq(self): 954 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 955 956 def adq_configure_tc(self): 957 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 958 959 def adq_set_busy_read(self, busy_read_val): 960 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 961 return {"net.core.busy_read": 0} 962 963 def stop(self): 964 self.nvmet_command(self.nvmet_bin, "clear") 965 self.restore_settings() 966 967 def get_nvme_device_bdf(self, nvme_dev_path): 968 nvme_name = os.path.basename(nvme_dev_path) 969 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 970 971 def get_nvme_devices(self): 972 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 973 nvme_list = [] 974 for dev in dev_list: 975 if "nvme" not in dev: 976 continue 977 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 978 continue 979 if len(self.nvme_allowlist) == 0: 980 nvme_list.append(dev) 981 continue 982 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 983 nvme_list.append(dev) 984 return dev_list 985 986 def nvmet_command(self, nvmet_bin, command): 987 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 988 989 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 990 991 nvmet_cfg = { 992 "ports": [], 993 "hosts": [], 994 "subsystems": [], 995 } 996 997 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 998 port = str(4420 + bdev_num) 999 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1000 serial = "SPDK00%s" % bdev_num 1001 bdev_name = nvme_list[bdev_num] 1002 1003 nvmet_cfg["subsystems"].append({ 1004 "allowed_hosts": [], 1005 "attr": { 1006 "allow_any_host": "1", 1007 "serial": serial, 1008 "version": "1.3" 1009 }, 1010 "namespaces": [ 1011 { 1012 "device": { 1013 "path": bdev_name, 1014 "uuid": "%s" % uuid.uuid4() 1015 }, 1016 "enable": 1, 1017 "nsid": port 1018 } 1019 ], 1020 "nqn": nqn 1021 }) 1022 1023 nvmet_cfg["ports"].append({ 1024 "addr": { 1025 "adrfam": "ipv4", 1026 "traddr": ip, 1027 "trsvcid": port, 1028 "trtype": self.transport 1029 }, 1030 "portid": bdev_num, 1031 "referrals": [], 1032 "subsystems": [nqn] 1033 }) 1034 1035 self.subsystem_info_list.append((port, nqn, ip)) 1036 self.subsys_no = len(self.subsystem_info_list) 1037 1038 with open("kernel.conf", "w") as fh: 1039 fh.write(json.dumps(nvmet_cfg, indent=2)) 1040 1041 def tgt_start(self): 1042 self.log.info("Configuring kernel NVMeOF Target") 1043 1044 if self.null_block: 1045 self.log.info("Configuring with null block device.") 1046 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1047 else: 1048 self.log.info("Configuring with NVMe drives.") 1049 nvme_list = self.get_nvme_devices() 1050 1051 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1052 self.subsys_no = len(nvme_list) 1053 1054 self.nvmet_command(self.nvmet_bin, "clear") 1055 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1056 1057 if self.enable_adq: 1058 self.adq_configure_tc() 1059 1060 self.log.info("Done configuring kernel NVMeOF Target") 1061 1062 1063class SPDKTarget(Target): 1064 def __init__(self, name, general_config, target_config): 1065 # Required fields 1066 self.core_mask = target_config["core_mask"] 1067 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1068 1069 super().__init__(name, general_config, target_config) 1070 1071 # Defaults 1072 self.dif_insert_strip = False 1073 self.null_block_dif_type = 0 1074 self.num_shared_buffers = 4096 1075 self.max_queue_depth = 128 1076 self.bpf_proc = None 1077 self.bpf_scripts = [] 1078 self.enable_dsa = False 1079 self.scheduler_core_limit = None 1080 self.iobuf_small_pool_count = 16383 1081 self.iobuf_large_pool_count = 2047 1082 1083 if "num_shared_buffers" in target_config: 1084 self.num_shared_buffers = target_config["num_shared_buffers"] 1085 if "max_queue_depth" in target_config: 1086 self.max_queue_depth = target_config["max_queue_depth"] 1087 if "null_block_dif_type" in target_config: 1088 self.null_block_dif_type = target_config["null_block_dif_type"] 1089 if "dif_insert_strip" in target_config: 1090 self.dif_insert_strip = target_config["dif_insert_strip"] 1091 if "bpf_scripts" in target_config: 1092 self.bpf_scripts = target_config["bpf_scripts"] 1093 if "dsa_settings" in target_config: 1094 self.enable_dsa = target_config["dsa_settings"] 1095 if "scheduler_core_limit" in target_config: 1096 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1097 if "iobuf_small_pool_count" in target_config: 1098 self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"] 1099 if "iobuf_large_pool_count" in target_config: 1100 self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"] 1101 1102 self.log.info("====DSA settings:====") 1103 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1104 1105 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1106 if mode not in ["default", "bynode", "cpulist", 1107 "shared", "split", "split-bynode"]: 1108 self.log.error("%s irq affinity setting not supported" % mode) 1109 raise Exception 1110 1111 # Create core list from SPDK's mask and change it to string. 1112 # This is the type configure_irq_affinity expects for cpulist parameter. 1113 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1114 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1115 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1116 1117 if mode == "shared": 1118 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1119 elif mode == "split": 1120 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1121 elif mode == "split-bynode": 1122 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1123 else: 1124 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1125 1126 def adq_set_busy_read(self, busy_read_val): 1127 return {"net.core.busy_read": busy_read_val} 1128 1129 def get_nvme_devices_count(self): 1130 return len(self.get_nvme_devices()) 1131 1132 def get_nvme_devices(self): 1133 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1134 bdev_bdfs = [] 1135 for bdev in bdev_subsys_json_obj["config"]: 1136 bdev_traddr = bdev["params"]["traddr"] 1137 if bdev_traddr in self.nvme_blocklist: 1138 continue 1139 if len(self.nvme_allowlist) == 0: 1140 bdev_bdfs.append(bdev_traddr) 1141 if bdev_traddr in self.nvme_allowlist: 1142 bdev_bdfs.append(bdev_traddr) 1143 return bdev_bdfs 1144 1145 def spdk_tgt_configure(self): 1146 self.log.info("Configuring SPDK NVMeOF target via RPC") 1147 1148 if self.enable_adq: 1149 self.adq_configure_tc() 1150 1151 # Create transport layer 1152 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1153 num_shared_buffers=self.num_shared_buffers, 1154 max_queue_depth=self.max_queue_depth, 1155 dif_insert_or_strip=self.dif_insert_strip, 1156 sock_priority=self.adq_priority) 1157 self.log.info("SPDK NVMeOF transport layer:") 1158 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1159 1160 if self.null_block: 1161 self.spdk_tgt_add_nullblock(self.null_block) 1162 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1163 else: 1164 self.spdk_tgt_add_nvme_conf() 1165 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1166 1167 self.log.info("Done configuring SPDK NVMeOF Target") 1168 1169 def spdk_tgt_add_nullblock(self, null_block_count): 1170 md_size = 0 1171 block_size = 4096 1172 if self.null_block_dif_type != 0: 1173 md_size = 128 1174 1175 self.log.info("Adding null block bdevices to config via RPC") 1176 for i in range(null_block_count): 1177 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1178 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1179 dif_type=self.null_block_dif_type, md_size=md_size) 1180 self.log.info("SPDK Bdevs configuration:") 1181 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1182 1183 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1184 self.log.info("Adding NVMe bdevs to config via RPC") 1185 1186 bdfs = self.get_nvme_devices() 1187 bdfs = [b.replace(":", ".") for b in bdfs] 1188 1189 if req_num_disks: 1190 if req_num_disks > len(bdfs): 1191 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1192 sys.exit(1) 1193 else: 1194 bdfs = bdfs[0:req_num_disks] 1195 1196 for i, bdf in enumerate(bdfs): 1197 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1198 1199 self.log.info("SPDK Bdevs configuration:") 1200 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1201 1202 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1203 self.log.info("Adding subsystems to config") 1204 if not req_num_disks: 1205 req_num_disks = self.get_nvme_devices_count() 1206 1207 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1208 port = str(4420 + bdev_num) 1209 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1210 serial = "SPDK00%s" % bdev_num 1211 bdev_name = "Nvme%sn1" % bdev_num 1212 1213 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1214 allow_any_host=True, max_namespaces=8) 1215 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1216 for nqn_name in [nqn, "discovery"]: 1217 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1218 nqn=nqn_name, 1219 trtype=self.transport, 1220 traddr=ip, 1221 trsvcid=port, 1222 adrfam="ipv4") 1223 self.subsystem_info_list.append((port, nqn, ip)) 1224 self.subsys_no = len(self.subsystem_info_list) 1225 1226 self.log.info("SPDK NVMeOF subsystem configuration:") 1227 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1228 1229 def bpf_start(self): 1230 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1231 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1232 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1233 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1234 1235 with open(self.pid, "r") as fh: 1236 nvmf_pid = str(fh.readline()) 1237 1238 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1239 self.log.info(cmd) 1240 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1241 1242 def tgt_start(self): 1243 if self.null_block: 1244 self.subsys_no = 1 1245 else: 1246 self.subsys_no = self.get_nvme_devices_count() 1247 self.log.info("Starting SPDK NVMeOF Target process") 1248 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1249 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1250 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1251 1252 with open(self.pid, "w") as fh: 1253 fh.write(str(proc.pid)) 1254 self.nvmf_proc = proc 1255 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1256 self.log.info("Waiting for spdk to initialize...") 1257 while True: 1258 if os.path.exists("/var/tmp/spdk.sock"): 1259 break 1260 time.sleep(1) 1261 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1262 1263 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1264 rpc.iobuf.iobuf_set_options(self.client, 1265 small_pool_count=self.iobuf_small_pool_count, 1266 large_pool_count=self.iobuf_large_pool_count, 1267 small_bufsize=None, 1268 large_bufsize=None) 1269 1270 if self.enable_zcopy: 1271 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1272 enable_zerocopy_send_server=True) 1273 self.log.info("Target socket options:") 1274 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1275 1276 if self.enable_adq: 1277 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1278 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1279 nvme_adminq_poll_period_us=100000, retry_count=4) 1280 1281 if self.enable_dsa: 1282 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1283 self.log.info("Target DSA accel module enabled") 1284 1285 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1286 rpc.framework_start_init(self.client) 1287 1288 if self.bpf_scripts: 1289 self.bpf_start() 1290 1291 self.spdk_tgt_configure() 1292 1293 def stop(self): 1294 if self.bpf_proc: 1295 self.log.info("Stopping BPF Trace script") 1296 self.bpf_proc.terminate() 1297 self.bpf_proc.wait() 1298 1299 if hasattr(self, "nvmf_proc"): 1300 try: 1301 self.nvmf_proc.terminate() 1302 self.nvmf_proc.wait(timeout=30) 1303 except Exception as e: 1304 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1305 self.log.info(e) 1306 self.nvmf_proc.kill() 1307 self.nvmf_proc.communicate() 1308 # Try to clean up RPC socket files if they were not removed 1309 # because of using 'kill' 1310 try: 1311 os.remove("/var/tmp/spdk.sock") 1312 os.remove("/var/tmp/spdk.sock.lock") 1313 except FileNotFoundError: 1314 pass 1315 self.restore_settings() 1316 1317 1318class KernelInitiator(Initiator): 1319 def __init__(self, name, general_config, initiator_config): 1320 super().__init__(name, general_config, initiator_config) 1321 1322 # Defaults 1323 self.extra_params = "" 1324 self.ioengine = "libaio" 1325 self.spdk_conf = "" 1326 1327 if "num_cores" in initiator_config: 1328 self.num_cores = initiator_config["num_cores"] 1329 1330 if "extra_params" in initiator_config: 1331 self.extra_params = initiator_config["extra_params"] 1332 1333 if "kernel_engine" in initiator_config: 1334 self.ioengine = initiator_config["kernel_engine"] 1335 if "io_uring" in self.ioengine: 1336 self.extra_params = "--nr-poll-queues=8" 1337 1338 def configure_adq(self): 1339 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1340 1341 def adq_configure_tc(self): 1342 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1343 1344 def adq_set_busy_read(self, busy_read_val): 1345 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1346 return {"net.core.busy_read": 0} 1347 1348 def get_connected_nvme_list(self): 1349 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1350 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1351 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1352 return nvme_list 1353 1354 def init_connect(self): 1355 self.log.info("Below connection attempts may result in error messages, this is expected!") 1356 for subsystem in self.subsystem_info_list: 1357 self.log.info("Trying to connect %s %s %s" % subsystem) 1358 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1359 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1360 time.sleep(2) 1361 1362 if "io_uring" in self.ioengine: 1363 self.log.info("Setting block layer settings for io_uring.") 1364 1365 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1366 # apparently it's not possible for connected subsystems. 1367 # Results in "error: Invalid argument" 1368 block_sysfs_settings = { 1369 "iostats": "0", 1370 "rq_affinity": "0", 1371 "nomerges": "2" 1372 } 1373 1374 for disk in self.get_connected_nvme_list(): 1375 sysfs = os.path.join("/sys/block", disk, "queue") 1376 for k, v in block_sysfs_settings.items(): 1377 sysfs_opt_path = os.path.join(sysfs, k) 1378 try: 1379 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1380 except CalledProcessError as e: 1381 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1382 finally: 1383 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1384 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1385 1386 def init_disconnect(self): 1387 for subsystem in self.subsystem_info_list: 1388 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1389 time.sleep(1) 1390 1391 def get_nvme_subsystem_numa(self, dev_name): 1392 # Remove two last characters to get controller name instead of subsystem name 1393 nvme_ctrl = os.path.basename(dev_name)[:-2] 1394 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1395 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1396 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1397 1398 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1399 self.available_cpus = self.get_numa_cpu_map() 1400 if len(threads) >= len(subsystems): 1401 threads = range(0, len(subsystems)) 1402 1403 # Generate connected nvme devices names and sort them by used NIC numa node 1404 # to allow better grouping when splitting into fio sections. 1405 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1406 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1407 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1408 1409 filename_section = "" 1410 nvme_per_split = int(len(nvme_list) / len(threads)) 1411 remainder = len(nvme_list) % len(threads) 1412 iterator = iter(nvme_list) 1413 result = [] 1414 for i in range(len(threads)): 1415 result.append([]) 1416 for _ in range(nvme_per_split): 1417 result[i].append(next(iterator)) 1418 if remainder: 1419 result[i].append(next(iterator)) 1420 remainder -= 1 1421 for i, r in enumerate(result): 1422 header = "[filename%s]" % i 1423 disks = "\n".join(["filename=%s" % x for x in r]) 1424 job_section_qd = round((io_depth * len(r)) / num_jobs) 1425 if job_section_qd == 0: 1426 job_section_qd = 1 1427 iodepth = "iodepth=%s" % job_section_qd 1428 1429 offset_section = "" 1430 if offset: 1431 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1432 1433 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1434 1435 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1436 1437 return filename_section 1438 1439 1440class SPDKInitiator(Initiator): 1441 def __init__(self, name, general_config, initiator_config): 1442 super().__init__(name, general_config, initiator_config) 1443 1444 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1445 self.install_spdk() 1446 1447 # Optional fields 1448 self.enable_data_digest = False 1449 if "enable_data_digest" in initiator_config: 1450 self.enable_data_digest = initiator_config["enable_data_digest"] 1451 if "num_cores" in initiator_config: 1452 self.num_cores = initiator_config["num_cores"] 1453 1454 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1455 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1456 1457 def adq_set_busy_read(self, busy_read_val): 1458 return {"net.core.busy_read": busy_read_val} 1459 1460 def install_spdk(self): 1461 self.log.info("Using fio binary %s" % self.fio_bin) 1462 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1463 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1464 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1465 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1466 "--enable-lto", "--disable-unit-tests"]) 1467 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1468 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1469 1470 self.log.info("SPDK built") 1471 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1472 1473 def init_connect(self): 1474 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1475 # bdev plugin initiates connection just before starting IO traffic. 1476 # This is just to have a "init_connect" equivalent of the same function 1477 # from KernelInitiator class. 1478 # Just prepare bdev.conf JSON file for later use and consider it 1479 # "making a connection". 1480 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1481 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1482 1483 def init_disconnect(self): 1484 # SPDK Initiator does not need to explicity disconnect as this gets done 1485 # after fio bdev plugin finishes IO. 1486 pass 1487 1488 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1489 bdev_cfg_section = { 1490 "subsystems": [ 1491 { 1492 "subsystem": "bdev", 1493 "config": [] 1494 } 1495 ] 1496 } 1497 1498 for i, subsys in enumerate(remote_subsystem_list): 1499 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1500 nvme_ctrl = { 1501 "method": "bdev_nvme_attach_controller", 1502 "params": { 1503 "name": "Nvme{}".format(i), 1504 "trtype": self.transport, 1505 "traddr": sub_addr, 1506 "trsvcid": sub_port, 1507 "subnqn": sub_nqn, 1508 "adrfam": "IPv4" 1509 } 1510 } 1511 1512 if self.enable_adq: 1513 nvme_ctrl["params"].update({"priority": "1"}) 1514 1515 if self.enable_data_digest: 1516 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1517 1518 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1519 1520 return json.dumps(bdev_cfg_section, indent=2) 1521 1522 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1523 self.available_cpus = self.get_numa_cpu_map() 1524 filename_section = "" 1525 if len(threads) >= len(subsystems): 1526 threads = range(0, len(subsystems)) 1527 1528 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1529 # to allow better grouping when splitting into fio sections. 1530 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1531 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1532 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1533 1534 nvme_per_split = int(len(subsystems) / len(threads)) 1535 remainder = len(subsystems) % len(threads) 1536 iterator = iter(filenames) 1537 result = [] 1538 for i in range(len(threads)): 1539 result.append([]) 1540 for _ in range(nvme_per_split): 1541 result[i].append(next(iterator)) 1542 if remainder: 1543 result[i].append(next(iterator)) 1544 remainder -= 1 1545 for i, r in enumerate(result): 1546 header = "[filename%s]" % i 1547 disks = "\n".join(["filename=%s" % x for x in r]) 1548 job_section_qd = round((io_depth * len(r)) / num_jobs) 1549 if job_section_qd == 0: 1550 job_section_qd = 1 1551 iodepth = "iodepth=%s" % job_section_qd 1552 1553 offset_section = "" 1554 if offset: 1555 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1556 1557 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1558 1559 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1560 1561 return filename_section 1562 1563 def get_nvme_subsystem_numa(self, bdev_name): 1564 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1565 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1566 1567 # Remove two last characters to get controller name instead of subsystem name 1568 nvme_ctrl = bdev_name[:-2] 1569 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1570 return self.get_route_nic_numa(remote_nvme_ip) 1571 1572 1573if __name__ == "__main__": 1574 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1575 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1576 1577 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1578 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1579 help='Configuration file.') 1580 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1581 help='Results directory.') 1582 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1583 help='CSV results filename.') 1584 parser.add_argument('-f', '--force', default=False, action='store_true', 1585 dest='force', help="""Force script to continue and try to use all 1586 available NVMe devices during test. 1587 WARNING: Might result in data loss on used NVMe drives""") 1588 1589 args = parser.parse_args() 1590 1591 logging.basicConfig(level=logging.INFO, 1592 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1593 1594 logging.info("Using config file: %s" % args.config) 1595 with open(args.config, "r") as config: 1596 data = json.load(config) 1597 1598 initiators = [] 1599 fio_cases = [] 1600 1601 general_config = data["general"] 1602 target_config = data["target"] 1603 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1604 1605 if "null_block_devices" not in data["target"] and \ 1606 (args.force is False and 1607 "allowlist" not in data["target"] and 1608 "blocklist" not in data["target"]): 1609 # TODO: Also check if allowlist or blocklist are not empty. 1610 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1611 You can choose to use all available NVMe drives on your system, which may potentially 1612 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1613 exit(1) 1614 1615 for k, v in data.items(): 1616 if "target" in k: 1617 v.update({"results_dir": args.results}) 1618 if data[k]["mode"] == "spdk": 1619 target_obj = SPDKTarget(k, data["general"], v) 1620 elif data[k]["mode"] == "kernel": 1621 target_obj = KernelTarget(k, data["general"], v) 1622 elif "initiator" in k: 1623 if data[k]["mode"] == "spdk": 1624 init_obj = SPDKInitiator(k, data["general"], v) 1625 elif data[k]["mode"] == "kernel": 1626 init_obj = KernelInitiator(k, data["general"], v) 1627 initiators.append(init_obj) 1628 elif "fio" in k: 1629 fio_workloads = itertools.product(data[k]["bs"], 1630 data[k]["qd"], 1631 data[k]["rw"]) 1632 1633 fio_run_time = data[k]["run_time"] 1634 fio_ramp_time = data[k]["ramp_time"] 1635 fio_rw_mix_read = data[k]["rwmixread"] 1636 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1637 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1638 1639 fio_rate_iops = 0 1640 if "rate_iops" in data[k]: 1641 fio_rate_iops = data[k]["rate_iops"] 1642 1643 fio_offset = False 1644 if "offset" in data[k]: 1645 fio_offset = data[k]["offset"] 1646 fio_offset_inc = 0 1647 if "offset_inc" in data[k]: 1648 fio_offset_inc = data[k]["offset_inc"] 1649 else: 1650 continue 1651 1652 try: 1653 os.mkdir(args.results) 1654 except FileExistsError: 1655 pass 1656 1657 for i in initiators: 1658 target_obj.initiator_info.append( 1659 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1660 ) 1661 1662 # TODO: This try block is definietly too large. Need to break this up into separate 1663 # logical blocks to reduce size. 1664 try: 1665 target_obj.tgt_start() 1666 1667 for i in initiators: 1668 i.match_subsystems(target_obj.subsystem_info_list) 1669 if i.enable_adq: 1670 i.adq_configure_tc() 1671 1672 # Poor mans threading 1673 # Run FIO tests 1674 for block_size, io_depth, rw in fio_workloads: 1675 configs = [] 1676 for i in initiators: 1677 i.init_connect() 1678 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1679 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1680 fio_offset, fio_offset_inc) 1681 configs.append(cfg) 1682 1683 for run_no in range(1, fio_run_num+1): 1684 threads = [] 1685 power_daemon = None 1686 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1687 1688 for i, cfg in zip(initiators, configs): 1689 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1690 threads.append(t) 1691 if target_obj.enable_sar: 1692 sar_file_prefix = measurements_prefix + "_sar" 1693 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1694 threads.append(t) 1695 1696 if target_obj.enable_pcm: 1697 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1698 1699 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1700 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1701 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1702 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1703 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1704 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1705 1706 threads.append(pcm_cpu_t) 1707 threads.append(pcm_mem_t) 1708 threads.append(pcm_pow_t) 1709 1710 if target_obj.enable_bw: 1711 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1712 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1713 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1714 threads.append(t) 1715 1716 if target_obj.enable_dpdk_memory: 1717 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1718 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1719 threads.append(t) 1720 1721 if target_obj.enable_pm: 1722 power_daemon = threading.Thread(target=target_obj.measure_power, 1723 args=(args.results, measurements_prefix, script_full_dir, 1724 fio_ramp_time, fio_run_time)) 1725 threads.append(power_daemon) 1726 1727 if target_obj.enable_adq: 1728 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1729 threads.append(ethtool_thread) 1730 1731 for t in threads: 1732 t.start() 1733 for t in threads: 1734 t.join() 1735 1736 for i in initiators: 1737 i.init_disconnect() 1738 i.copy_result_files(args.results) 1739 try: 1740 parse_results(args.results, args.csv_filename) 1741 except Exception: 1742 logging.error("There was an error with parsing the results") 1743 finally: 1744 for i in initiators: 1745 try: 1746 i.stop() 1747 except Exception as err: 1748 pass 1749 target_obj.stop() 1750