1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7from json.decoder import JSONDecodeError 8import os 9import re 10import sys 11import argparse 12import json 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20from collections import OrderedDict 21 22import paramiko 23import pandas as pd 24from common import * 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.nic_ips = server_config["nic_ips"] 39 self.mode = server_config["mode"] 40 41 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 42 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 43 self.irq_scripts_dir = server_config["irq_scripts_dir"] 44 45 self.local_nic_info = [] 46 self._nics_json_obj = {} 47 self.svc_restore_dict = {} 48 self.sysctl_restore_dict = {} 49 self.tuned_restore_dict = {} 50 self.governor_restore = "" 51 self.tuned_profile = "" 52 53 self.enable_adq = False 54 self.adq_priority = None 55 if "adq_enable" in server_config and server_config["adq_enable"]: 56 self.enable_adq = server_config["adq_enable"] 57 self.adq_priority = 1 58 59 if "tuned_profile" in server_config: 60 self.tuned_profile = server_config["tuned_profile"] 61 62 if not re.match("^[A-Za-z0-9]*$", name): 63 self.log_print("Please use a name which contains only letters or numbers") 64 sys.exit(1) 65 66 def log_print(self, msg): 67 print("[%s] %s" % (self.name, msg), flush=True) 68 69 @staticmethod 70 def get_uncommented_lines(lines): 71 return [line for line in lines if line and not line.startswith('#')] 72 73 def get_nic_name_by_ip(self, ip): 74 if not self._nics_json_obj: 75 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 76 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 77 for nic in self._nics_json_obj: 78 for addr in nic["addr_info"]: 79 if ip in addr["local"]: 80 return nic["ifname"] 81 82 def set_local_nic_info_helper(self): 83 pass 84 85 def set_local_nic_info(self, pci_info): 86 def extract_network_elements(json_obj): 87 nic_list = [] 88 if isinstance(json_obj, list): 89 for x in json_obj: 90 nic_list.extend(extract_network_elements(x)) 91 elif isinstance(json_obj, dict): 92 if "children" in json_obj: 93 nic_list.extend(extract_network_elements(json_obj["children"])) 94 if "class" in json_obj.keys() and "network" in json_obj["class"]: 95 nic_list.append(json_obj) 96 return nic_list 97 98 self.local_nic_info = extract_network_elements(pci_info) 99 100 def get_nic_numa_node(self, nic_name): 101 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 102 103 def get_numa_cpu_map(self): 104 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 105 numa_cpu_json_map = {} 106 107 for cpu in numa_cpu_json_obj["cpus"]: 108 cpu_num = int(cpu["cpu"]) 109 numa_node = int(cpu["node"]) 110 numa_cpu_json_map.setdefault(numa_node, []) 111 numa_cpu_json_map[numa_node].append(cpu_num) 112 113 return numa_cpu_json_map 114 115 # pylint: disable=R0201 116 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 117 return "" 118 119 def configure_system(self): 120 self.load_drivers() 121 self.configure_services() 122 self.configure_sysctl() 123 self.configure_tuned() 124 self.configure_cpu_governor() 125 self.configure_irq_affinity() 126 127 def load_drivers(self): 128 self.log_print("Loading drivers") 129 self.exec_cmd(["sudo", "modprobe", "-a", 130 "nvme-%s" % self.transport, 131 "nvmet-%s" % self.transport]) 132 if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block: 133 self.exec_cmd(["sudo", "modprobe", "null_blk", 134 "nr_devices=%s" % self.null_block]) 135 136 def configure_adq(self): 137 if self.mode == "kernel": 138 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 139 return 140 self.adq_load_modules() 141 self.adq_configure_nic() 142 143 def adq_load_modules(self): 144 self.log_print("Modprobing ADQ-related Linux modules...") 145 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 146 for module in adq_module_deps: 147 try: 148 self.exec_cmd(["sudo", "modprobe", module]) 149 self.log_print("%s loaded!" % module) 150 except CalledProcessError as e: 151 self.log_print("ERROR: failed to load module %s" % module) 152 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 153 154 def adq_configure_tc(self): 155 self.log_print("Configuring ADQ Traffic classes and filters...") 156 157 if self.mode == "kernel": 158 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 159 return 160 161 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 162 num_queues_tc1 = self.num_cores 163 port_param = "dst_port" if isinstance(self, Target) else "src_port" 164 port = "4420" 165 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 166 167 for nic_ip in self.nic_ips: 168 nic_name = self.get_nic_name_by_ip(nic_ip) 169 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 170 "root", "mqprio", "num_tc", "2", "map", "0", "1", 171 "queues", "%s@0" % num_queues_tc0, 172 "%s@%s" % (num_queues_tc1, num_queues_tc0), 173 "hw", "1", "mode", "channel"] 174 self.log_print(" ".join(tc_qdisc_map_cmd)) 175 self.exec_cmd(tc_qdisc_map_cmd) 176 177 time.sleep(5) 178 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 179 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 180 self.exec_cmd(tc_qdisc_ingress_cmd) 181 182 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 183 "protocol", "ip", "ingress", "prio", "1", "flower", 184 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 185 "skip_sw", "hw_tc", "1"] 186 self.log_print(" ".join(tc_filter_cmd)) 187 self.exec_cmd(tc_filter_cmd) 188 189 # show tc configuration 190 self.log_print("Show tc configuration for %s NIC..." % nic_name) 191 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 192 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 193 self.log_print("%s" % tc_disk_out) 194 self.log_print("%s" % tc_filter_out) 195 196 # Ethtool coalesce settings must be applied after configuring traffic classes 197 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 198 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 199 200 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 201 xps_cmd = ["sudo", xps_script_path, nic_name] 202 self.log_print(xps_cmd) 203 self.exec_cmd(xps_cmd) 204 205 def reload_driver(self, driver): 206 try: 207 self.exec_cmd(["sudo", "rmmod", driver]) 208 self.exec_cmd(["sudo", "modprobe", driver]) 209 except CalledProcessError as e: 210 self.log_print("ERROR: failed to reload %s module!" % driver) 211 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 212 213 def adq_configure_nic(self): 214 self.log_print("Configuring NIC port settings for ADQ testing...") 215 216 # Reload the driver first, to make sure any previous settings are re-set. 217 self.reload_driver("ice") 218 219 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 220 for nic in nic_names: 221 self.log_print(nic) 222 try: 223 self.exec_cmd(["sudo", "ethtool", "-K", nic, 224 "hw-tc-offload", "on"]) # Enable hardware TC offload 225 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 226 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 227 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 228 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 229 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 230 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 231 "channel-pkt-inspect-optimize", "on"]) 232 except CalledProcessError as e: 233 self.log_print("ERROR: failed to configure NIC port using ethtool!") 234 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 235 self.log_print("Please update your NIC driver and firmware versions and try again.") 236 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 237 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 238 239 def configure_services(self): 240 self.log_print("Configuring active services...") 241 svc_config = configparser.ConfigParser(strict=False) 242 243 # Below list is valid only for RHEL / Fedora systems and might not 244 # contain valid names for other distributions. 245 svc_target_state = { 246 "firewalld": "inactive", 247 "irqbalance": "inactive", 248 "lldpad.service": "inactive", 249 "lldpad.socket": "inactive" 250 } 251 252 for service in svc_target_state: 253 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 254 out = "\n".join(["[%s]" % service, out]) 255 svc_config.read_string(out) 256 257 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 258 continue 259 260 service_state = svc_config[service]["ActiveState"] 261 self.log_print("Current state of %s service is %s" % (service, service_state)) 262 self.svc_restore_dict.update({service: service_state}) 263 if service_state != "inactive": 264 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 265 self.exec_cmd(["sudo", "systemctl", "stop", service]) 266 267 def configure_sysctl(self): 268 self.log_print("Tuning sysctl settings...") 269 270 busy_read = 0 271 if self.enable_adq and self.mode == "spdk": 272 busy_read = 1 273 274 sysctl_opts = { 275 "net.core.busy_poll": 0, 276 "net.core.busy_read": busy_read, 277 "net.core.somaxconn": 4096, 278 "net.core.netdev_max_backlog": 8192, 279 "net.ipv4.tcp_max_syn_backlog": 16384, 280 "net.core.rmem_max": 268435456, 281 "net.core.wmem_max": 268435456, 282 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 283 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 284 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 285 "net.ipv4.route.flush": 1, 286 "vm.overcommit_memory": 1, 287 } 288 289 for opt, value in sysctl_opts.items(): 290 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 291 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 292 293 def configure_tuned(self): 294 if not self.tuned_profile: 295 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 296 return 297 298 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 299 service = "tuned" 300 tuned_config = configparser.ConfigParser(strict=False) 301 302 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 303 out = "\n".join(["[%s]" % service, out]) 304 tuned_config.read_string(out) 305 tuned_state = tuned_config[service]["ActiveState"] 306 self.svc_restore_dict.update({service: tuned_state}) 307 308 if tuned_state != "inactive": 309 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 310 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 311 312 self.tuned_restore_dict = { 313 "profile": profile, 314 "mode": profile_mode 315 } 316 317 self.exec_cmd(["sudo", "systemctl", "start", service]) 318 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 319 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 320 321 def configure_cpu_governor(self): 322 self.log_print("Setting CPU governor to performance...") 323 324 # This assumes that there is the same CPU scaling governor on each CPU 325 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 326 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 327 328 def configure_irq_affinity(self): 329 self.log_print("Setting NIC irq affinity for NICs...") 330 331 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 332 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 333 for nic in nic_names: 334 irq_cmd = ["sudo", irq_script_path, nic] 335 self.log_print(irq_cmd) 336 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 337 338 def restore_services(self): 339 self.log_print("Restoring services...") 340 for service, state in self.svc_restore_dict.items(): 341 cmd = "stop" if state == "inactive" else "start" 342 self.exec_cmd(["sudo", "systemctl", cmd, service]) 343 344 def restore_sysctl(self): 345 self.log_print("Restoring sysctl settings...") 346 for opt, value in self.sysctl_restore_dict.items(): 347 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 348 349 def restore_tuned(self): 350 self.log_print("Restoring tuned-adm settings...") 351 352 if not self.tuned_restore_dict: 353 return 354 355 if self.tuned_restore_dict["mode"] == "auto": 356 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 357 self.log_print("Reverted tuned-adm to auto_profile.") 358 else: 359 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 360 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 361 362 def restore_governor(self): 363 self.log_print("Restoring CPU governor setting...") 364 if self.governor_restore: 365 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 366 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 367 368 369class Target(Server): 370 def __init__(self, name, general_config, target_config): 371 super().__init__(name, general_config, target_config) 372 373 # Defaults 374 self.enable_sar = False 375 self.sar_delay = 0 376 self.sar_interval = 0 377 self.sar_count = 0 378 self.enable_pcm = False 379 self.pcm_dir = "" 380 self.pcm_delay = 0 381 self.pcm_interval = 0 382 self.pcm_count = 0 383 self.enable_bandwidth = 0 384 self.bandwidth_count = 0 385 self.enable_dpdk_memory = False 386 self.dpdk_wait_time = 0 387 self.enable_zcopy = False 388 self.scheduler_name = "static" 389 self.null_block = 0 390 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 391 self.subsystem_info_list = [] 392 self.initiator_info = [] 393 self.enable_pm = True 394 395 if "null_block_devices" in target_config: 396 self.null_block = target_config["null_block_devices"] 397 if "sar_settings" in target_config: 398 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 399 if "pcm_settings" in target_config: 400 self.enable_pcm = True 401 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 402 if "enable_bandwidth" in target_config: 403 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 404 if "enable_dpdk_memory" in target_config: 405 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 406 if "scheduler_settings" in target_config: 407 self.scheduler_name = target_config["scheduler_settings"] 408 if "zcopy_settings" in target_config: 409 self.enable_zcopy = target_config["zcopy_settings"] 410 if "results_dir" in target_config: 411 self.results_dir = target_config["results_dir"] 412 if "enable_pm" in target_config: 413 self.enable_pm = target_config["enable_pm"] 414 415 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 416 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 417 self.set_local_nic_info(self.set_local_nic_info_helper()) 418 419 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 420 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 421 422 self.configure_system() 423 if self.enable_adq: 424 self.configure_adq() 425 self.sys_config() 426 427 def set_local_nic_info_helper(self): 428 return json.loads(self.exec_cmd(["lshw", "-json"])) 429 430 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 431 stderr_opt = None 432 if stderr_redirect: 433 stderr_opt = subprocess.STDOUT 434 if change_dir: 435 old_cwd = os.getcwd() 436 os.chdir(change_dir) 437 self.log_print("Changing directory to %s" % change_dir) 438 439 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 440 441 if change_dir: 442 os.chdir(old_cwd) 443 self.log_print("Changing directory to %s" % old_cwd) 444 return out 445 446 def zip_spdk_sources(self, spdk_dir, dest_file): 447 self.log_print("Zipping SPDK source directory") 448 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 449 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 450 for file in files: 451 fh.write(os.path.relpath(os.path.join(root, file))) 452 fh.close() 453 self.log_print("Done zipping") 454 455 @staticmethod 456 def _chunks(input_list, chunks_no): 457 div, rem = divmod(len(input_list), chunks_no) 458 for i in range(chunks_no): 459 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 460 yield input_list[si:si + (div + 1 if i < rem else div)] 461 462 def spread_bdevs(self, req_disks): 463 # Spread available block devices indexes: 464 # - evenly across available initiator systems 465 # - evenly across available NIC interfaces for 466 # each initiator 467 # Not NUMA aware. 468 ip_bdev_map = [] 469 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 470 471 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 472 self.initiator_info[i]["bdev_range"] = init_chunk 473 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 474 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 475 for c in nic_chunk: 476 ip_bdev_map.append((ip, c)) 477 return ip_bdev_map 478 479 @staticmethod 480 def read_json_stats(file): 481 with open(file, "r") as json_data: 482 data = json.load(json_data) 483 job_pos = 0 # job_post = 0 because using aggregated results 484 485 # Check if latency is in nano or microseconds to choose correct dict key 486 def get_lat_unit(key_prefix, dict_section): 487 # key prefix - lat, clat or slat. 488 # dict section - portion of json containing latency bucket in question 489 # Return dict key to access the bucket and unit as string 490 for k, _ in dict_section.items(): 491 if k.startswith(key_prefix): 492 return k, k.split("_")[1] 493 494 def get_clat_percentiles(clat_dict_leaf): 495 if "percentile" in clat_dict_leaf: 496 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 497 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 498 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 499 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 500 501 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 502 else: 503 # Latest fio versions do not provide "percentile" results if no 504 # measurements were done, so just return zeroes 505 return [0, 0, 0, 0] 506 507 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 508 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 509 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 510 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 511 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 512 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 513 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 514 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 515 data["jobs"][job_pos]["read"][clat_key]) 516 517 if "ns" in lat_unit: 518 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 519 if "ns" in clat_unit: 520 read_p99_lat = read_p99_lat / 1000 521 read_p99_9_lat = read_p99_9_lat / 1000 522 read_p99_99_lat = read_p99_99_lat / 1000 523 read_p99_999_lat = read_p99_999_lat / 1000 524 525 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 526 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 527 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 528 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 529 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 530 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 531 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 532 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 533 data["jobs"][job_pos]["write"][clat_key]) 534 535 if "ns" in lat_unit: 536 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 537 if "ns" in clat_unit: 538 write_p99_lat = write_p99_lat / 1000 539 write_p99_9_lat = write_p99_9_lat / 1000 540 write_p99_99_lat = write_p99_99_lat / 1000 541 write_p99_999_lat = write_p99_999_lat / 1000 542 543 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 544 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 545 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 546 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 547 548 def parse_results(self, results_dir, csv_file): 549 files = os.listdir(results_dir) 550 fio_files = filter(lambda x: ".fio" in x, files) 551 json_files = [x for x in files if ".json" in x] 552 553 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 554 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 555 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 556 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 557 558 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 559 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 560 561 header_line = ",".join(["Name", *headers]) 562 aggr_header_line = ",".join(["Name", *aggr_headers]) 563 564 # Create empty results file 565 with open(os.path.join(results_dir, csv_file), "w") as fh: 566 fh.write(aggr_header_line + "\n") 567 rows = set() 568 569 for fio_config in fio_files: 570 self.log_print("Getting FIO stats for %s" % fio_config) 571 job_name, _ = os.path.splitext(fio_config) 572 573 # Look in the filename for rwmixread value. Function arguments do 574 # not have that information. 575 # TODO: Improve this function by directly using workload params instead 576 # of regexing through filenames. 577 if "read" in job_name: 578 rw_mixread = 1 579 elif "write" in job_name: 580 rw_mixread = 0 581 else: 582 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 583 584 # If "_CPU" exists in name - ignore it 585 # Initiators for the same job could have different num_cores parameter 586 job_name = re.sub(r"_\d+CPU", "", job_name) 587 job_result_files = [x for x in json_files if x.startswith(job_name)] 588 self.log_print("Matching result files for current fio config:") 589 for j in job_result_files: 590 self.log_print("\t %s" % j) 591 592 # There may have been more than 1 initiator used in test, need to check that 593 # Result files are created so that string after last "_" separator is server name 594 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 595 inits_avg_results = [] 596 for i in inits_names: 597 self.log_print("\tGetting stats for initiator %s" % i) 598 # There may have been more than 1 test run for this job, calculate average results for initiator 599 i_results = [x for x in job_result_files if i in x] 600 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 601 602 separate_stats = [] 603 for r in i_results: 604 try: 605 stats = self.read_json_stats(os.path.join(results_dir, r)) 606 separate_stats.append(stats) 607 self.log_print(stats) 608 except JSONDecodeError: 609 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 610 611 init_results = [sum(x) for x in zip(*separate_stats)] 612 init_results = [x / len(separate_stats) for x in init_results] 613 inits_avg_results.append(init_results) 614 615 self.log_print("\tAverage results for initiator %s" % i) 616 self.log_print(init_results) 617 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 618 fh.write(header_line + "\n") 619 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 620 621 # Sum results of all initiators running this FIO job. 622 # Latency results are an average of latencies from accros all initiators. 623 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 624 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 625 for key in inits_avg_results: 626 if "lat" in key: 627 inits_avg_results[key] /= len(inits_names) 628 629 # Aggregate separate read/write values into common labels 630 # Take rw_mixread into consideration for mixed read/write workloads. 631 aggregate_results = OrderedDict() 632 for h in aggr_headers: 633 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 634 if "lat" in h: 635 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 636 else: 637 _ = read_stat + write_stat 638 aggregate_results[h] = "{0:.3f}".format(_) 639 640 rows.add(",".join([job_name, *aggregate_results.values()])) 641 642 # Save results to file 643 for row in rows: 644 with open(os.path.join(results_dir, csv_file), "a") as fh: 645 fh.write(row + "\n") 646 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 647 648 def measure_sar(self, results_dir, sar_file_prefix): 649 cpu_number = os.cpu_count() 650 sar_idle_sum = 0 651 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 652 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 653 654 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 655 time.sleep(self.sar_delay) 656 self.log_print("Starting SAR measurements") 657 658 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 659 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 660 for line in out.split("\n"): 661 if "Average" in line: 662 if "CPU" in line: 663 self.log_print("Summary CPU utilization from SAR:") 664 self.log_print(line) 665 elif "all" in line: 666 self.log_print(line) 667 else: 668 sar_idle_sum += float(line.split()[7]) 669 fh.write(out) 670 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 671 672 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 673 f.write("%0.2f" % sar_cpu_usage) 674 675 def measure_power(self, results_dir, prefix, script_full_dir): 676 return subprocess.Popen(["%s/../pm/collect-bmc-pm" % script_full_dir, "-d", results_dir, "-l", "-p", prefix, "-x"]) 677 678 def ethtool_after_fio_ramp(self, fio_ramp_time): 679 time.sleep(fio_ramp_time//2) 680 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 681 for nic in nic_names: 682 self.log_print(nic) 683 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 684 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 685 686 def measure_pcm_memory(self, results_dir, pcm_file_name): 687 time.sleep(self.pcm_delay) 688 cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 689 pcm_memory = subprocess.Popen(cmd) 690 time.sleep(self.pcm_count) 691 pcm_memory.terminate() 692 693 def measure_pcm(self, results_dir, pcm_file_name): 694 time.sleep(self.pcm_delay) 695 cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, 696 "-csv=%s/%s" % (results_dir, pcm_file_name)] 697 subprocess.run(cmd) 698 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 699 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 700 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 701 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 702 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 703 704 def measure_pcm_power(self, results_dir, pcm_power_file_name): 705 time.sleep(self.pcm_delay) 706 out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 707 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 708 fh.write(out) 709 710 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 711 self.log_print("INFO: starting network bandwidth measure") 712 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 713 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 714 715 def measure_dpdk_memory(self, results_dir): 716 self.log_print("INFO: waiting to generate DPDK memory usage") 717 time.sleep(self.dpdk_wait_time) 718 self.log_print("INFO: generating DPDK memory usage") 719 rpc.env.env_dpdk_get_mem_stats 720 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 721 722 def sys_config(self): 723 self.log_print("====Kernel release:====") 724 self.log_print(os.uname().release) 725 self.log_print("====Kernel command line:====") 726 with open('/proc/cmdline') as f: 727 cmdline = f.readlines() 728 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 729 self.log_print("====sysctl conf:====") 730 with open('/etc/sysctl.conf') as f: 731 sysctl = f.readlines() 732 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 733 self.log_print("====Cpu power info:====") 734 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 735 self.log_print("====zcopy settings:====") 736 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 737 self.log_print("====Scheduler settings:====") 738 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 739 740 741class Initiator(Server): 742 def __init__(self, name, general_config, initiator_config): 743 super().__init__(name, general_config, initiator_config) 744 745 # Required fields 746 self.ip = initiator_config["ip"] 747 self.target_nic_ips = initiator_config["target_nic_ips"] 748 749 # Defaults 750 self.cpus_allowed = None 751 self.cpus_allowed_policy = "shared" 752 self.spdk_dir = "/tmp/spdk" 753 self.fio_bin = "/usr/src/fio/fio" 754 self.nvmecli_bin = "nvme" 755 self.cpu_frequency = None 756 self.subsystem_info_list = [] 757 758 if "spdk_dir" in initiator_config: 759 self.spdk_dir = initiator_config["spdk_dir"] 760 if "fio_bin" in initiator_config: 761 self.fio_bin = initiator_config["fio_bin"] 762 if "nvmecli_bin" in initiator_config: 763 self.nvmecli_bin = initiator_config["nvmecli_bin"] 764 if "cpus_allowed" in initiator_config: 765 self.cpus_allowed = initiator_config["cpus_allowed"] 766 if "cpus_allowed_policy" in initiator_config: 767 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 768 if "cpu_frequency" in initiator_config: 769 self.cpu_frequency = initiator_config["cpu_frequency"] 770 if os.getenv('SPDK_WORKSPACE'): 771 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 772 773 self.ssh_connection = paramiko.SSHClient() 774 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 775 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 776 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 777 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 778 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 779 780 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 781 self.copy_spdk("/tmp/spdk.zip") 782 self.set_local_nic_info(self.set_local_nic_info_helper()) 783 self.set_cpu_frequency() 784 self.configure_system() 785 if self.enable_adq: 786 self.configure_adq() 787 self.sys_config() 788 789 def set_local_nic_info_helper(self): 790 return json.loads(self.exec_cmd(["lshw", "-json"])) 791 792 def stop(self): 793 self.ssh_connection.close() 794 795 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 796 if change_dir: 797 cmd = ["cd", change_dir, ";", *cmd] 798 799 # In case one of the command elements contains whitespace and is not 800 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 801 # when sending to remote system. 802 for i, c in enumerate(cmd): 803 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 804 cmd[i] = '"%s"' % c 805 cmd = " ".join(cmd) 806 807 # Redirect stderr to stdout thanks using get_pty option if needed 808 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 809 out = stdout.read().decode(encoding="utf-8") 810 811 # Check the return code 812 rc = stdout.channel.recv_exit_status() 813 if rc: 814 raise CalledProcessError(int(rc), cmd, out) 815 816 return out 817 818 def put_file(self, local, remote_dest): 819 ftp = self.ssh_connection.open_sftp() 820 ftp.put(local, remote_dest) 821 ftp.close() 822 823 def get_file(self, remote, local_dest): 824 ftp = self.ssh_connection.open_sftp() 825 ftp.get(remote, local_dest) 826 ftp.close() 827 828 def copy_spdk(self, local_spdk_zip): 829 self.log_print("Copying SPDK sources to initiator %s" % self.name) 830 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 831 self.log_print("Copied sources zip from target") 832 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 833 self.log_print("Sources unpacked") 834 835 def copy_result_files(self, dest_dir): 836 self.log_print("Copying results") 837 838 if not os.path.exists(dest_dir): 839 os.mkdir(dest_dir) 840 841 # Get list of result files from initiator and copy them back to target 842 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 843 844 for file in file_list: 845 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 846 os.path.join(dest_dir, file)) 847 self.log_print("Done copying results") 848 849 def discover_subsystems(self, address_list, subsys_no): 850 num_nvmes = range(0, subsys_no) 851 nvme_discover_output = "" 852 for ip, subsys_no in itertools.product(address_list, num_nvmes): 853 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 854 nvme_discover_cmd = ["sudo", 855 "%s" % self.nvmecli_bin, 856 "discover", "-t", "%s" % self.transport, 857 "-s", "%s" % (4420 + subsys_no), 858 "-a", "%s" % ip] 859 860 try: 861 stdout = self.exec_cmd(nvme_discover_cmd) 862 if stdout: 863 nvme_discover_output = nvme_discover_output + stdout 864 except CalledProcessError: 865 # Do nothing. In case of discovering remote subsystems of kernel target 866 # we expect "nvme discover" to fail a bunch of times because we basically 867 # scan ports. 868 pass 869 870 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 871 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 872 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 873 nvme_discover_output) # from nvme discovery output 874 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 875 subsystems = filter(lambda x: "discovery" not in x[1], subsystems) 876 subsystems = list(set(subsystems)) 877 subsystems.sort(key=lambda x: x[1]) 878 self.log_print("Found matching subsystems on target side:") 879 for s in subsystems: 880 self.log_print(s) 881 self.subsystem_info_list = subsystems 882 883 def gen_fio_filename_conf(self, *args, **kwargs): 884 # Logic implemented in SPDKInitiator and KernelInitiator classes 885 pass 886 887 def get_route_nic_numa(self, remote_nvme_ip): 888 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 889 local_nvme_nic = local_nvme_nic[0]["dev"] 890 return self.get_nic_numa_node(local_nvme_nic) 891 892 @staticmethod 893 def gen_fio_offset_section(offset_inc, num_jobs): 894 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 895 return "\n".join(["size=%s%%" % offset_inc, 896 "offset=0%", 897 "offset_increment=%s%%" % offset_inc]) 898 899 def gen_fio_numa_section(self, fio_filenames_list): 900 numa_stats = {} 901 for nvme in fio_filenames_list: 902 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 903 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 904 905 # Use the most common NUMA node for this chunk to allocate memory and CPUs 906 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 907 908 return "\n".join(["numa_cpu_nodes=%s" % section_local_numa, 909 "numa_mem_policy=prefer:%s" % section_local_numa]) 910 911 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 912 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 913 offset=False, offset_inc=0): 914 fio_conf_template = """ 915[global] 916ioengine={ioengine} 917{spdk_conf} 918thread=1 919group_reporting=1 920direct=1 921percentile_list=50:90:99:99.5:99.9:99.99:99.999 922 923norandommap=1 924rw={rw} 925rwmixread={rwmixread} 926bs={block_size} 927time_based=1 928ramp_time={ramp_time} 929runtime={run_time} 930rate_iops={rate_iops} 931""" 932 if "spdk" in self.mode: 933 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 934 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 935 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 936 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 937 else: 938 ioengine = self.ioengine 939 spdk_conf = "" 940 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 941 "|", "awk", "'{print $1}'"]) 942 subsystems = [x for x in out.split("\n") if "nvme" in x] 943 944 if self.cpus_allowed is not None: 945 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 946 cpus_num = 0 947 cpus = self.cpus_allowed.split(",") 948 for cpu in cpus: 949 if "-" in cpu: 950 a, b = cpu.split("-") 951 a = int(a) 952 b = int(b) 953 cpus_num += len(range(a, b)) 954 else: 955 cpus_num += 1 956 self.num_cores = cpus_num 957 threads = range(0, self.num_cores) 958 elif hasattr(self, 'num_cores'): 959 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 960 threads = range(0, int(self.num_cores)) 961 else: 962 self.num_cores = len(subsystems) 963 threads = range(0, len(subsystems)) 964 965 if "spdk" in self.mode: 966 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 967 offset, offset_inc) 968 else: 969 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs, 970 offset, offset_inc) 971 972 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 973 rw=rw, rwmixread=rwmixread, block_size=block_size, 974 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 975 976 # TODO: hipri disabled for now, as it causes fio errors: 977 # io_u error on file /dev/nvme2n1: Operation not supported 978 # See comment in KernelInitiator class, kernel_init_connect() function 979 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 980 fio_config = fio_config + """ 981fixedbufs=1 982registerfiles=1 983#hipri=1 984""" 985 if num_jobs: 986 fio_config = fio_config + "numjobs=%s \n" % num_jobs 987 if self.cpus_allowed is not None: 988 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 989 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 990 fio_config = fio_config + filename_section 991 992 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 993 if hasattr(self, "num_cores"): 994 fio_config_filename += "_%sCPU" % self.num_cores 995 fio_config_filename += ".fio" 996 997 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 998 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 999 self.log_print("Created FIO Config:") 1000 self.log_print(fio_config) 1001 1002 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 1003 1004 def set_cpu_frequency(self): 1005 if self.cpu_frequency is not None: 1006 try: 1007 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 1008 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 1009 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 1010 except Exception: 1011 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 1012 sys.exit() 1013 else: 1014 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 1015 1016 def run_fio(self, fio_config_file, run_num=None): 1017 job_name, _ = os.path.splitext(fio_config_file) 1018 self.log_print("Starting FIO run for job: %s" % job_name) 1019 self.log_print("Using FIO: %s" % self.fio_bin) 1020 1021 if run_num: 1022 for i in range(1, run_num + 1): 1023 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 1024 try: 1025 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 1026 "--output=%s" % output_filename, "--eta=never"], True) 1027 self.log_print(output) 1028 except subprocess.CalledProcessError as e: 1029 self.log_print("ERROR: Fio process failed!") 1030 self.log_print(e.stdout) 1031 else: 1032 output_filename = job_name + "_" + self.name + ".json" 1033 output = self.exec_cmd(["sudo", self.fio_bin, 1034 fio_config_file, "--output-format=json", 1035 "--output" % output_filename], True) 1036 self.log_print(output) 1037 self.log_print("FIO run finished. Results in: %s" % output_filename) 1038 1039 def sys_config(self): 1040 self.log_print("====Kernel release:====") 1041 self.log_print(self.exec_cmd(["uname", "-r"])) 1042 self.log_print("====Kernel command line:====") 1043 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1044 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1045 self.log_print("====sysctl conf:====") 1046 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 1047 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1048 self.log_print("====Cpu power info:====") 1049 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 1050 1051 1052class KernelTarget(Target): 1053 def __init__(self, name, general_config, target_config): 1054 super().__init__(name, general_config, target_config) 1055 # Defaults 1056 self.nvmet_bin = "nvmetcli" 1057 1058 if "nvmet_bin" in target_config: 1059 self.nvmet_bin = target_config["nvmet_bin"] 1060 1061 def stop(self): 1062 nvmet_command(self.nvmet_bin, "clear") 1063 1064 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1065 1066 nvmet_cfg = { 1067 "ports": [], 1068 "hosts": [], 1069 "subsystems": [], 1070 } 1071 1072 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1073 port = str(4420 + bdev_num) 1074 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1075 serial = "SPDK00%s" % bdev_num 1076 bdev_name = nvme_list[bdev_num] 1077 1078 nvmet_cfg["subsystems"].append({ 1079 "allowed_hosts": [], 1080 "attr": { 1081 "allow_any_host": "1", 1082 "serial": serial, 1083 "version": "1.3" 1084 }, 1085 "namespaces": [ 1086 { 1087 "device": { 1088 "path": bdev_name, 1089 "uuid": "%s" % uuid.uuid4() 1090 }, 1091 "enable": 1, 1092 "nsid": port 1093 } 1094 ], 1095 "nqn": nqn 1096 }) 1097 1098 nvmet_cfg["ports"].append({ 1099 "addr": { 1100 "adrfam": "ipv4", 1101 "traddr": ip, 1102 "trsvcid": port, 1103 "trtype": self.transport 1104 }, 1105 "portid": bdev_num, 1106 "referrals": [], 1107 "subsystems": [nqn] 1108 }) 1109 1110 self.subsystem_info_list.append([port, nqn, ip]) 1111 self.subsys_no = len(self.subsystem_info_list) 1112 1113 with open("kernel.conf", "w") as fh: 1114 fh.write(json.dumps(nvmet_cfg, indent=2)) 1115 1116 def tgt_start(self): 1117 self.log_print("Configuring kernel NVMeOF Target") 1118 1119 if self.null_block: 1120 print("Configuring with null block device.") 1121 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1122 else: 1123 print("Configuring with NVMe drives.") 1124 nvme_list = get_nvme_devices() 1125 1126 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1127 self.subsys_no = len(nvme_list) 1128 1129 nvmet_command(self.nvmet_bin, "clear") 1130 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1131 1132 if self.enable_adq: 1133 self.adq_configure_tc() 1134 1135 self.log_print("Done configuring kernel NVMeOF Target") 1136 1137 1138class SPDKTarget(Target): 1139 def __init__(self, name, general_config, target_config): 1140 super().__init__(name, general_config, target_config) 1141 1142 # Required fields 1143 self.core_mask = target_config["core_mask"] 1144 self.num_cores = self.get_num_cores(self.core_mask) 1145 1146 # Defaults 1147 self.dif_insert_strip = False 1148 self.null_block_dif_type = 0 1149 self.num_shared_buffers = 4096 1150 self.max_queue_depth = 128 1151 self.bpf_proc = None 1152 self.bpf_scripts = [] 1153 self.enable_dsa = False 1154 self.scheduler_core_limit = None 1155 1156 if "num_shared_buffers" in target_config: 1157 self.num_shared_buffers = target_config["num_shared_buffers"] 1158 if "max_queue_depth" in target_config: 1159 self.max_queue_depth = target_config["max_queue_depth"] 1160 if "null_block_dif_type" in target_config: 1161 self.null_block_dif_type = target_config["null_block_dif_type"] 1162 if "dif_insert_strip" in target_config: 1163 self.dif_insert_strip = target_config["dif_insert_strip"] 1164 if "bpf_scripts" in target_config: 1165 self.bpf_scripts = target_config["bpf_scripts"] 1166 if "dsa_settings" in target_config: 1167 self.enable_dsa = target_config["dsa_settings"] 1168 if "scheduler_core_limit" in target_config: 1169 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1170 1171 self.log_print("====DSA settings:====") 1172 self.log_print("DSA enabled: %s" % (self.enable_dsa)) 1173 1174 @staticmethod 1175 def get_num_cores(core_mask): 1176 if "0x" in core_mask: 1177 return bin(int(core_mask, 16)).count("1") 1178 else: 1179 num_cores = 0 1180 core_mask = core_mask.replace("[", "") 1181 core_mask = core_mask.replace("]", "") 1182 for i in core_mask.split(","): 1183 if "-" in i: 1184 x, y = i.split("-") 1185 num_cores += len(range(int(x), int(y))) + 1 1186 else: 1187 num_cores += 1 1188 return num_cores 1189 1190 def spdk_tgt_configure(self): 1191 self.log_print("Configuring SPDK NVMeOF target via RPC") 1192 1193 if self.enable_adq: 1194 self.adq_configure_tc() 1195 1196 # Create transport layer 1197 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1198 num_shared_buffers=self.num_shared_buffers, 1199 max_queue_depth=self.max_queue_depth, 1200 dif_insert_or_strip=self.dif_insert_strip, 1201 sock_priority=self.adq_priority) 1202 self.log_print("SPDK NVMeOF transport layer:") 1203 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1204 1205 if self.null_block: 1206 self.spdk_tgt_add_nullblock(self.null_block) 1207 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1208 else: 1209 self.spdk_tgt_add_nvme_conf() 1210 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1211 1212 self.log_print("Done configuring SPDK NVMeOF Target") 1213 1214 def spdk_tgt_add_nullblock(self, null_block_count): 1215 md_size = 0 1216 block_size = 4096 1217 if self.null_block_dif_type != 0: 1218 md_size = 128 1219 1220 self.log_print("Adding null block bdevices to config via RPC") 1221 for i in range(null_block_count): 1222 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1223 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1224 dif_type=self.null_block_dif_type, md_size=md_size) 1225 self.log_print("SPDK Bdevs configuration:") 1226 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1227 1228 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1229 self.log_print("Adding NVMe bdevs to config via RPC") 1230 1231 bdfs = get_nvme_devices_bdf() 1232 bdfs = [b.replace(":", ".") for b in bdfs] 1233 1234 if req_num_disks: 1235 if req_num_disks > len(bdfs): 1236 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1237 sys.exit(1) 1238 else: 1239 bdfs = bdfs[0:req_num_disks] 1240 1241 for i, bdf in enumerate(bdfs): 1242 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1243 1244 self.log_print("SPDK Bdevs configuration:") 1245 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1246 1247 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1248 self.log_print("Adding subsystems to config") 1249 if not req_num_disks: 1250 req_num_disks = get_nvme_devices_count() 1251 1252 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1253 port = str(4420 + bdev_num) 1254 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1255 serial = "SPDK00%s" % bdev_num 1256 bdev_name = "Nvme%sn1" % bdev_num 1257 1258 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1259 allow_any_host=True, max_namespaces=8) 1260 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1261 for nqn_name in [nqn, "discovery"]: 1262 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1263 nqn=nqn_name, 1264 trtype=self.transport, 1265 traddr=ip, 1266 trsvcid=port, 1267 adrfam="ipv4") 1268 self.subsystem_info_list.append([port, nqn, ip]) 1269 self.subsys_no = len(self.subsystem_info_list) 1270 1271 self.log_print("SPDK NVMeOF subsystem configuration:") 1272 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1273 1274 def bpf_start(self): 1275 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1276 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1277 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1278 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1279 1280 with open(self.pid, "r") as fh: 1281 nvmf_pid = str(fh.readline()) 1282 1283 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1284 self.log_print(cmd) 1285 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1286 1287 def tgt_start(self): 1288 if self.null_block: 1289 self.subsys_no = 1 1290 else: 1291 self.subsys_no = get_nvme_devices_count() 1292 self.log_print("Starting SPDK NVMeOF Target process") 1293 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1294 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1295 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1296 1297 with open(self.pid, "w") as fh: 1298 fh.write(str(proc.pid)) 1299 self.nvmf_proc = proc 1300 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1301 self.log_print("Waiting for spdk to initialize...") 1302 while True: 1303 if os.path.exists("/var/tmp/spdk.sock"): 1304 break 1305 time.sleep(1) 1306 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1307 1308 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1309 1310 if self.enable_zcopy: 1311 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1312 enable_zerocopy_send_server=True) 1313 self.log_print("Target socket options:") 1314 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1315 1316 if self.enable_adq: 1317 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1318 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1319 nvme_adminq_poll_period_us=100000, retry_count=4) 1320 1321 if self.enable_dsa: 1322 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1323 self.log_print("Target DSA accel module enabled") 1324 1325 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1326 rpc.framework_start_init(self.client) 1327 1328 if self.bpf_scripts: 1329 self.bpf_start() 1330 1331 self.spdk_tgt_configure() 1332 1333 def stop(self): 1334 if self.bpf_proc: 1335 self.log_print("Stopping BPF Trace script") 1336 self.bpf_proc.terminate() 1337 self.bpf_proc.wait() 1338 1339 if hasattr(self, "nvmf_proc"): 1340 try: 1341 self.nvmf_proc.terminate() 1342 self.nvmf_proc.wait(timeout=30) 1343 except Exception as e: 1344 self.log_print("Failed to terminate SPDK Target process. Sending SIGKILL.") 1345 self.log_print(e) 1346 self.nvmf_proc.kill() 1347 self.nvmf_proc.communicate() 1348 # Try to clean up RPC socket files if they were not removed 1349 # because of using 'kill' 1350 try: 1351 os.remove("/var/tmp/spdk.sock") 1352 os.remove("/var/tmp/spdk.sock.lock") 1353 except FileNotFoundError: 1354 pass 1355 1356 1357class KernelInitiator(Initiator): 1358 def __init__(self, name, general_config, initiator_config): 1359 super().__init__(name, general_config, initiator_config) 1360 1361 # Defaults 1362 self.extra_params = "" 1363 self.ioengine = "libaio" 1364 1365 if "extra_params" in initiator_config: 1366 self.extra_params = initiator_config["extra_params"] 1367 1368 if "kernel_engine" in initiator_config: 1369 self.ioengine = initiator_config["kernel_engine"] 1370 if "io_uring" in self.ioengine: 1371 self.extra_params = "--nr-poll-queues=8" 1372 1373 def get_connected_nvme_list(self): 1374 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1375 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1376 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1377 return nvme_list 1378 1379 def kernel_init_connect(self): 1380 self.log_print("Below connection attempts may result in error messages, this is expected!") 1381 for subsystem in self.subsystem_info_list: 1382 self.log_print("Trying to connect %s %s %s" % subsystem) 1383 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1384 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1385 time.sleep(2) 1386 1387 if "io_uring" in self.ioengine: 1388 self.log_print("Setting block layer settings for io_uring.") 1389 1390 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1391 # apparently it's not possible for connected subsystems. 1392 # Results in "error: Invalid argument" 1393 block_sysfs_settings = { 1394 "iostats": "0", 1395 "rq_affinity": "0", 1396 "nomerges": "2" 1397 } 1398 1399 for disk in self.get_connected_nvme_list(): 1400 sysfs = os.path.join("/sys/block", disk, "queue") 1401 for k, v in block_sysfs_settings.items(): 1402 sysfs_opt_path = os.path.join(sysfs, k) 1403 try: 1404 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1405 except subprocess.CalledProcessError as e: 1406 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1407 finally: 1408 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1409 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1410 1411 def kernel_init_disconnect(self): 1412 for subsystem in self.subsystem_info_list: 1413 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1414 time.sleep(1) 1415 1416 def get_nvme_subsystem_numa(self, dev_name): 1417 # Remove two last characters to get controller name instead of subsystem name 1418 nvme_ctrl = os.path.basename(dev_name)[:-2] 1419 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1420 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1421 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1422 1423 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1424 # Generate connected nvme devices names and sort them by used NIC numa node 1425 # to allow better grouping when splitting into fio sections. 1426 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1427 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1428 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1429 1430 filename_section = "" 1431 nvme_per_split = int(len(nvme_list) / len(threads)) 1432 remainder = len(nvme_list) % len(threads) 1433 iterator = iter(nvme_list) 1434 result = [] 1435 for i in range(len(threads)): 1436 result.append([]) 1437 for _ in range(nvme_per_split): 1438 result[i].append(next(iterator)) 1439 if remainder: 1440 result[i].append(next(iterator)) 1441 remainder -= 1 1442 for i, r in enumerate(result): 1443 header = "[filename%s]" % i 1444 disks = "\n".join(["filename=%s" % x for x in r]) 1445 job_section_qd = round((io_depth * len(r)) / num_jobs) 1446 if job_section_qd == 0: 1447 job_section_qd = 1 1448 iodepth = "iodepth=%s" % job_section_qd 1449 1450 offset_section = "" 1451 if offset: 1452 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1453 1454 numa_opts = self.gen_fio_numa_section(r) 1455 1456 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1457 1458 return filename_section 1459 1460 1461class SPDKInitiator(Initiator): 1462 def __init__(self, name, general_config, initiator_config): 1463 super().__init__(name, general_config, initiator_config) 1464 1465 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1466 self.install_spdk() 1467 1468 # Required fields 1469 self.num_cores = initiator_config["num_cores"] 1470 1471 # Optional fields 1472 self.enable_data_digest = False 1473 if "enable_data_digest" in initiator_config: 1474 self.enable_data_digest = initiator_config["enable_data_digest"] 1475 1476 def install_spdk(self): 1477 self.log_print("Using fio binary %s" % self.fio_bin) 1478 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1479 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1480 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1481 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1482 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1483 1484 self.log_print("SPDK built") 1485 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1486 1487 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1488 bdev_cfg_section = { 1489 "subsystems": [ 1490 { 1491 "subsystem": "bdev", 1492 "config": [] 1493 } 1494 ] 1495 } 1496 1497 for i, subsys in enumerate(remote_subsystem_list): 1498 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1499 nvme_ctrl = { 1500 "method": "bdev_nvme_attach_controller", 1501 "params": { 1502 "name": "Nvme{}".format(i), 1503 "trtype": self.transport, 1504 "traddr": sub_addr, 1505 "trsvcid": sub_port, 1506 "subnqn": sub_nqn, 1507 "adrfam": "IPv4" 1508 } 1509 } 1510 1511 if self.enable_adq: 1512 nvme_ctrl["params"].update({"priority": "1"}) 1513 1514 if self.enable_data_digest: 1515 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1516 1517 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1518 1519 return json.dumps(bdev_cfg_section, indent=2) 1520 1521 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1522 filename_section = "" 1523 if len(threads) >= len(subsystems): 1524 threads = range(0, len(subsystems)) 1525 1526 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1527 # to allow better grouping when splitting into fio sections. 1528 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1529 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1530 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1531 1532 nvme_per_split = int(len(subsystems) / len(threads)) 1533 remainder = len(subsystems) % len(threads) 1534 iterator = iter(filenames) 1535 result = [] 1536 for i in range(len(threads)): 1537 result.append([]) 1538 for _ in range(nvme_per_split): 1539 result[i].append(next(iterator)) 1540 if remainder: 1541 result[i].append(next(iterator)) 1542 remainder -= 1 1543 for i, r in enumerate(result): 1544 header = "[filename%s]" % i 1545 disks = "\n".join(["filename=%s" % x for x in r]) 1546 job_section_qd = round((io_depth * len(r)) / num_jobs) 1547 if job_section_qd == 0: 1548 job_section_qd = 1 1549 iodepth = "iodepth=%s" % job_section_qd 1550 1551 offset_section = "" 1552 if offset: 1553 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1554 1555 numa_opts = self.gen_fio_numa_section(r) 1556 1557 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1558 1559 return filename_section 1560 1561 def get_nvme_subsystem_numa(self, bdev_name): 1562 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1563 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1564 1565 # Remove two last characters to get controller name instead of subsystem name 1566 nvme_ctrl = bdev_name[:-2] 1567 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1568 return self.get_route_nic_numa(remote_nvme_ip) 1569 1570 1571if __name__ == "__main__": 1572 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1573 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1574 1575 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1576 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1577 help='Configuration file.') 1578 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1579 help='Results directory.') 1580 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1581 help='CSV results filename.') 1582 1583 args = parser.parse_args() 1584 1585 print("Using config file: %s" % args.config) 1586 with open(args.config, "r") as config: 1587 data = json.load(config) 1588 1589 initiators = [] 1590 fio_cases = [] 1591 1592 general_config = data["general"] 1593 target_config = data["target"] 1594 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1595 1596 for k, v in data.items(): 1597 if "target" in k: 1598 v.update({"results_dir": args.results}) 1599 if data[k]["mode"] == "spdk": 1600 target_obj = SPDKTarget(k, data["general"], v) 1601 elif data[k]["mode"] == "kernel": 1602 target_obj = KernelTarget(k, data["general"], v) 1603 elif "initiator" in k: 1604 if data[k]["mode"] == "spdk": 1605 init_obj = SPDKInitiator(k, data["general"], v) 1606 elif data[k]["mode"] == "kernel": 1607 init_obj = KernelInitiator(k, data["general"], v) 1608 initiators.append(init_obj) 1609 elif "fio" in k: 1610 fio_workloads = itertools.product(data[k]["bs"], 1611 data[k]["qd"], 1612 data[k]["rw"]) 1613 1614 fio_run_time = data[k]["run_time"] 1615 fio_ramp_time = data[k]["ramp_time"] 1616 fio_rw_mix_read = data[k]["rwmixread"] 1617 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1618 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1619 1620 fio_rate_iops = 0 1621 if "rate_iops" in data[k]: 1622 fio_rate_iops = data[k]["rate_iops"] 1623 1624 fio_offset = False 1625 if "offset" in data[k]: 1626 fio_offset = data[k]["offset"] 1627 fio_offset_inc = 0 1628 if "offset_inc" in data[k]: 1629 fio_offset_inc = data[k]["offset_inc"] 1630 else: 1631 continue 1632 1633 try: 1634 os.mkdir(args.results) 1635 except FileExistsError: 1636 pass 1637 1638 for i in initiators: 1639 target_obj.initiator_info.append( 1640 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1641 ) 1642 1643 # TODO: This try block is definietly too large. Need to break this up into separate 1644 # logical blocks to reduce size. 1645 try: 1646 target_obj.tgt_start() 1647 1648 for i in initiators: 1649 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1650 if i.enable_adq: 1651 i.adq_configure_tc() 1652 1653 # Poor mans threading 1654 # Run FIO tests 1655 for block_size, io_depth, rw in fio_workloads: 1656 threads = [] 1657 configs = [] 1658 power_daemon = None 1659 for i in initiators: 1660 if i.mode == "kernel": 1661 i.kernel_init_connect() 1662 1663 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1664 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1665 fio_offset, fio_offset_inc) 1666 configs.append(cfg) 1667 1668 for i, cfg in zip(initiators, configs): 1669 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1670 threads.append(t) 1671 if target_obj.enable_sar: 1672 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1673 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1674 threads.append(t) 1675 1676 if target_obj.enable_pcm: 1677 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1678 1679 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1680 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1681 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1682 1683 threads.append(pcm_cpu_t) 1684 threads.append(pcm_mem_t) 1685 threads.append(pcm_pow_t) 1686 1687 if target_obj.enable_bandwidth: 1688 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1689 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1690 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1691 threads.append(t) 1692 1693 if target_obj.enable_dpdk_memory: 1694 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1695 threads.append(t) 1696 1697 if target_obj.enable_adq: 1698 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1699 threads.append(ethtool_thread) 1700 1701 if target_obj.enable_pm: 1702 power_daemon = target_obj.measure_power(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir) 1703 1704 for t in threads: 1705 t.start() 1706 for t in threads: 1707 t.join() 1708 1709 for i in initiators: 1710 if i.mode == "kernel": 1711 i.kernel_init_disconnect() 1712 i.copy_result_files(args.results) 1713 1714 if power_daemon: 1715 power_daemon.terminate() 1716 1717 target_obj.restore_governor() 1718 target_obj.restore_tuned() 1719 target_obj.restore_services() 1720 target_obj.restore_sysctl() 1721 if target_obj.enable_adq: 1722 target_obj.reload_driver("ice") 1723 for i in initiators: 1724 i.restore_governor() 1725 i.restore_tuned() 1726 i.restore_services() 1727 i.restore_sysctl() 1728 if i.enable_adq: 1729 i.reload_driver("ice") 1730 target_obj.parse_results(args.results, args.csv_filename) 1731 finally: 1732 for i in initiators: 1733 try: 1734 i.stop() 1735 except Exception as err: 1736 pass 1737 target_obj.stop() 1738