1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 def get_nic_numa_node(self, nic_name): 97 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 98 99 def get_numa_cpu_map(self): 100 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 101 numa_cpu_json_map = {} 102 103 for cpu in numa_cpu_json_obj["cpus"]: 104 cpu_num = int(cpu["cpu"]) 105 numa_node = int(cpu["node"]) 106 numa_cpu_json_map.setdefault(numa_node, []) 107 numa_cpu_json_map[numa_node].append(cpu_num) 108 109 return numa_cpu_json_map 110 111 # pylint: disable=R0201 112 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 113 return "" 114 115 def configure_system(self): 116 self.load_drivers() 117 self.configure_services() 118 self.configure_sysctl() 119 self.configure_tuned() 120 self.configure_cpu_governor() 121 self.configure_irq_affinity() 122 123 def load_drivers(self): 124 self.log_print("Loading drivers") 125 self.exec_cmd(["sudo", "modprobe", "-a", 126 "nvme-%s" % self.transport, 127 "nvmet-%s" % self.transport]) 128 if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block: 129 self.exec_cmd(["sudo", "modprobe", "null_blk", 130 "nr_devices=%s" % self.null_block]) 131 132 def configure_adq(self): 133 if self.mode == "kernel": 134 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 135 return 136 self.adq_load_modules() 137 self.adq_configure_nic() 138 139 def adq_load_modules(self): 140 self.log_print("Modprobing ADQ-related Linux modules...") 141 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 142 for module in adq_module_deps: 143 try: 144 self.exec_cmd(["sudo", "modprobe", module]) 145 self.log_print("%s loaded!" % module) 146 except CalledProcessError as e: 147 self.log_print("ERROR: failed to load module %s" % module) 148 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 149 150 def adq_configure_tc(self): 151 self.log_print("Configuring ADQ Traffic classes and filters...") 152 153 if self.mode == "kernel": 154 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 155 return 156 157 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 158 num_queues_tc1 = self.num_cores 159 port_param = "dst_port" if isinstance(self, Target) else "src_port" 160 port = "4420" 161 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 162 163 for nic_ip in self.nic_ips: 164 nic_name = self.get_nic_name_by_ip(nic_ip) 165 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 166 "root", "mqprio", "num_tc", "2", "map", "0", "1", 167 "queues", "%s@0" % num_queues_tc0, 168 "%s@%s" % (num_queues_tc1, num_queues_tc0), 169 "hw", "1", "mode", "channel"] 170 self.log_print(" ".join(tc_qdisc_map_cmd)) 171 self.exec_cmd(tc_qdisc_map_cmd) 172 173 time.sleep(5) 174 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 175 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 176 self.exec_cmd(tc_qdisc_ingress_cmd) 177 178 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 179 "protocol", "ip", "ingress", "prio", "1", "flower", 180 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 181 "skip_sw", "hw_tc", "1"] 182 self.log_print(" ".join(tc_filter_cmd)) 183 self.exec_cmd(tc_filter_cmd) 184 185 # show tc configuration 186 self.log_print("Show tc configuration for %s NIC..." % nic_name) 187 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 188 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 189 self.log_print("%s" % tc_disk_out) 190 self.log_print("%s" % tc_filter_out) 191 192 # Ethtool coalesce settings must be applied after configuring traffic classes 193 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 194 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 195 196 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 197 xps_cmd = ["sudo", xps_script_path, nic_name] 198 self.log_print(xps_cmd) 199 self.exec_cmd(xps_cmd) 200 201 def reload_driver(self, driver): 202 try: 203 self.exec_cmd(["sudo", "rmmod", driver]) 204 self.exec_cmd(["sudo", "modprobe", driver]) 205 except CalledProcessError as e: 206 self.log_print("ERROR: failed to reload %s module!" % driver) 207 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 208 209 def adq_configure_nic(self): 210 self.log_print("Configuring NIC port settings for ADQ testing...") 211 212 # Reload the driver first, to make sure any previous settings are re-set. 213 self.reload_driver("ice") 214 215 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 216 for nic in nic_names: 217 self.log_print(nic) 218 try: 219 self.exec_cmd(["sudo", "ethtool", "-K", nic, 220 "hw-tc-offload", "on"]) # Enable hardware TC offload 221 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 222 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 223 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 224 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 225 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 226 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 227 "channel-pkt-inspect-optimize", "on"]) 228 except CalledProcessError as e: 229 self.log_print("ERROR: failed to configure NIC port using ethtool!") 230 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 231 self.log_print("Please update your NIC driver and firmware versions and try again.") 232 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 233 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 234 235 def configure_services(self): 236 self.log_print("Configuring active services...") 237 svc_config = configparser.ConfigParser(strict=False) 238 239 # Below list is valid only for RHEL / Fedora systems and might not 240 # contain valid names for other distributions. 241 svc_target_state = { 242 "firewalld": "inactive", 243 "irqbalance": "inactive", 244 "lldpad.service": "inactive", 245 "lldpad.socket": "inactive" 246 } 247 248 for service in svc_target_state: 249 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 250 out = "\n".join(["[%s]" % service, out]) 251 svc_config.read_string(out) 252 253 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 254 continue 255 256 service_state = svc_config[service]["ActiveState"] 257 self.log_print("Current state of %s service is %s" % (service, service_state)) 258 self.svc_restore_dict.update({service: service_state}) 259 if service_state != "inactive": 260 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 261 self.exec_cmd(["sudo", "systemctl", "stop", service]) 262 263 def configure_sysctl(self): 264 self.log_print("Tuning sysctl settings...") 265 266 busy_read = 0 267 if self.enable_adq and self.mode == "spdk": 268 busy_read = 1 269 270 sysctl_opts = { 271 "net.core.busy_poll": 0, 272 "net.core.busy_read": busy_read, 273 "net.core.somaxconn": 4096, 274 "net.core.netdev_max_backlog": 8192, 275 "net.ipv4.tcp_max_syn_backlog": 16384, 276 "net.core.rmem_max": 268435456, 277 "net.core.wmem_max": 268435456, 278 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 279 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 280 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 281 "net.ipv4.route.flush": 1, 282 "vm.overcommit_memory": 1, 283 } 284 285 for opt, value in sysctl_opts.items(): 286 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 287 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 288 289 def configure_tuned(self): 290 if not self.tuned_profile: 291 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 292 return 293 294 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 295 service = "tuned" 296 tuned_config = configparser.ConfigParser(strict=False) 297 298 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 299 out = "\n".join(["[%s]" % service, out]) 300 tuned_config.read_string(out) 301 tuned_state = tuned_config[service]["ActiveState"] 302 self.svc_restore_dict.update({service: tuned_state}) 303 304 if tuned_state != "inactive": 305 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 306 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 307 308 self.tuned_restore_dict = { 309 "profile": profile, 310 "mode": profile_mode 311 } 312 313 self.exec_cmd(["sudo", "systemctl", "start", service]) 314 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 315 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 316 317 def configure_cpu_governor(self): 318 self.log_print("Setting CPU governor to performance...") 319 320 # This assumes that there is the same CPU scaling governor on each CPU 321 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 322 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 323 324 def configure_irq_affinity(self): 325 self.log_print("Setting NIC irq affinity for NICs...") 326 327 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 328 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 329 for nic in nic_names: 330 irq_cmd = ["sudo", irq_script_path, nic] 331 self.log_print(irq_cmd) 332 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 333 334 def restore_services(self): 335 self.log_print("Restoring services...") 336 for service, state in self.svc_restore_dict.items(): 337 cmd = "stop" if state == "inactive" else "start" 338 self.exec_cmd(["sudo", "systemctl", cmd, service]) 339 340 def restore_sysctl(self): 341 self.log_print("Restoring sysctl settings...") 342 for opt, value in self.sysctl_restore_dict.items(): 343 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 344 345 def restore_tuned(self): 346 self.log_print("Restoring tuned-adm settings...") 347 348 if not self.tuned_restore_dict: 349 return 350 351 if self.tuned_restore_dict["mode"] == "auto": 352 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 353 self.log_print("Reverted tuned-adm to auto_profile.") 354 else: 355 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 356 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 357 358 def restore_governor(self): 359 self.log_print("Restoring CPU governor setting...") 360 if self.governor_restore: 361 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 362 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 363 364 365class Target(Server): 366 def __init__(self, name, general_config, target_config): 367 super().__init__(name, general_config, target_config) 368 369 # Defaults 370 self.enable_sar = False 371 self.sar_delay = 0 372 self.sar_interval = 0 373 self.sar_count = 0 374 self.enable_pcm = False 375 self.pcm_dir = "" 376 self.pcm_delay = 0 377 self.pcm_interval = 0 378 self.pcm_count = 0 379 self.enable_bandwidth = 0 380 self.bandwidth_count = 0 381 self.enable_dpdk_memory = False 382 self.dpdk_wait_time = 0 383 self.enable_zcopy = False 384 self.scheduler_name = "static" 385 self.null_block = 0 386 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 387 self.subsystem_info_list = [] 388 self.initiator_info = [] 389 self.enable_pm = True 390 391 if "null_block_devices" in target_config: 392 self.null_block = target_config["null_block_devices"] 393 if "sar_settings" in target_config: 394 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 395 if "pcm_settings" in target_config: 396 self.enable_pcm = True 397 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 398 if "enable_bandwidth" in target_config: 399 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 400 if "enable_dpdk_memory" in target_config: 401 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 402 if "scheduler_settings" in target_config: 403 self.scheduler_name = target_config["scheduler_settings"] 404 if "zcopy_settings" in target_config: 405 self.enable_zcopy = target_config["zcopy_settings"] 406 if "results_dir" in target_config: 407 self.results_dir = target_config["results_dir"] 408 if "enable_pm" in target_config: 409 self.enable_pm = target_config["enable_pm"] 410 411 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 412 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 413 self.set_local_nic_info(self.set_local_nic_info_helper()) 414 415 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 416 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 417 418 self.configure_system() 419 if self.enable_adq: 420 self.configure_adq() 421 self.sys_config() 422 423 def set_local_nic_info_helper(self): 424 return json.loads(self.exec_cmd(["lshw", "-json"])) 425 426 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 427 stderr_opt = None 428 if stderr_redirect: 429 stderr_opt = subprocess.STDOUT 430 if change_dir: 431 old_cwd = os.getcwd() 432 os.chdir(change_dir) 433 self.log_print("Changing directory to %s" % change_dir) 434 435 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 436 437 if change_dir: 438 os.chdir(old_cwd) 439 self.log_print("Changing directory to %s" % old_cwd) 440 return out 441 442 def zip_spdk_sources(self, spdk_dir, dest_file): 443 self.log_print("Zipping SPDK source directory") 444 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 445 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 446 for file in files: 447 fh.write(os.path.relpath(os.path.join(root, file))) 448 fh.close() 449 self.log_print("Done zipping") 450 451 @staticmethod 452 def _chunks(input_list, chunks_no): 453 div, rem = divmod(len(input_list), chunks_no) 454 for i in range(chunks_no): 455 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 456 yield input_list[si:si + (div + 1 if i < rem else div)] 457 458 def spread_bdevs(self, req_disks): 459 # Spread available block devices indexes: 460 # - evenly across available initiator systems 461 # - evenly across available NIC interfaces for 462 # each initiator 463 # Not NUMA aware. 464 ip_bdev_map = [] 465 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 466 467 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 468 self.initiator_info[i]["bdev_range"] = init_chunk 469 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 470 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 471 for c in nic_chunk: 472 ip_bdev_map.append((ip, c)) 473 return ip_bdev_map 474 475 @staticmethod 476 def read_json_stats(file): 477 with open(file, "r") as json_data: 478 data = json.load(json_data) 479 job_pos = 0 # job_post = 0 because using aggregated results 480 481 # Check if latency is in nano or microseconds to choose correct dict key 482 def get_lat_unit(key_prefix, dict_section): 483 # key prefix - lat, clat or slat. 484 # dict section - portion of json containing latency bucket in question 485 # Return dict key to access the bucket and unit as string 486 for k, _ in dict_section.items(): 487 if k.startswith(key_prefix): 488 return k, k.split("_")[1] 489 490 def get_clat_percentiles(clat_dict_leaf): 491 if "percentile" in clat_dict_leaf: 492 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 493 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 494 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 495 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 496 497 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 498 else: 499 # Latest fio versions do not provide "percentile" results if no 500 # measurements were done, so just return zeroes 501 return [0, 0, 0, 0] 502 503 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 504 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 505 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 506 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 507 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 508 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 509 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 510 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 511 data["jobs"][job_pos]["read"][clat_key]) 512 513 if "ns" in lat_unit: 514 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 515 if "ns" in clat_unit: 516 read_p99_lat = read_p99_lat / 1000 517 read_p99_9_lat = read_p99_9_lat / 1000 518 read_p99_99_lat = read_p99_99_lat / 1000 519 read_p99_999_lat = read_p99_999_lat / 1000 520 521 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 522 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 523 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 524 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 525 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 526 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 527 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 528 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 529 data["jobs"][job_pos]["write"][clat_key]) 530 531 if "ns" in lat_unit: 532 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 533 if "ns" in clat_unit: 534 write_p99_lat = write_p99_lat / 1000 535 write_p99_9_lat = write_p99_9_lat / 1000 536 write_p99_99_lat = write_p99_99_lat / 1000 537 write_p99_999_lat = write_p99_999_lat / 1000 538 539 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 540 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 541 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 542 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 543 544 def parse_results(self, results_dir, csv_file): 545 files = os.listdir(results_dir) 546 fio_files = filter(lambda x: ".fio" in x, files) 547 json_files = [x for x in files if ".json" in x] 548 549 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 550 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 551 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 552 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 553 554 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 555 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 556 557 header_line = ",".join(["Name", *headers]) 558 aggr_header_line = ",".join(["Name", *aggr_headers]) 559 560 # Create empty results file 561 with open(os.path.join(results_dir, csv_file), "w") as fh: 562 fh.write(aggr_header_line + "\n") 563 rows = set() 564 565 for fio_config in fio_files: 566 self.log_print("Getting FIO stats for %s" % fio_config) 567 job_name, _ = os.path.splitext(fio_config) 568 569 # Look in the filename for rwmixread value. Function arguments do 570 # not have that information. 571 # TODO: Improve this function by directly using workload params instead 572 # of regexing through filenames. 573 if "read" in job_name: 574 rw_mixread = 1 575 elif "write" in job_name: 576 rw_mixread = 0 577 else: 578 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 579 580 # If "_CPU" exists in name - ignore it 581 # Initiators for the same job could have different num_cores parameter 582 job_name = re.sub(r"_\d+CPU", "", job_name) 583 job_result_files = [x for x in json_files if x.startswith(job_name)] 584 self.log_print("Matching result files for current fio config:") 585 for j in job_result_files: 586 self.log_print("\t %s" % j) 587 588 # There may have been more than 1 initiator used in test, need to check that 589 # Result files are created so that string after last "_" separator is server name 590 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 591 inits_avg_results = [] 592 for i in inits_names: 593 self.log_print("\tGetting stats for initiator %s" % i) 594 # There may have been more than 1 test run for this job, calculate average results for initiator 595 i_results = [x for x in job_result_files if i in x] 596 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 597 598 separate_stats = [] 599 for r in i_results: 600 try: 601 stats = self.read_json_stats(os.path.join(results_dir, r)) 602 separate_stats.append(stats) 603 self.log_print(stats) 604 except JSONDecodeError: 605 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 606 607 init_results = [sum(x) for x in zip(*separate_stats)] 608 init_results = [x / len(separate_stats) for x in init_results] 609 inits_avg_results.append(init_results) 610 611 self.log_print("\tAverage results for initiator %s" % i) 612 self.log_print(init_results) 613 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 614 fh.write(header_line + "\n") 615 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 616 617 # Sum results of all initiators running this FIO job. 618 # Latency results are an average of latencies from accros all initiators. 619 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 620 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 621 for key in inits_avg_results: 622 if "lat" in key: 623 inits_avg_results[key] /= len(inits_names) 624 625 # Aggregate separate read/write values into common labels 626 # Take rw_mixread into consideration for mixed read/write workloads. 627 aggregate_results = OrderedDict() 628 for h in aggr_headers: 629 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 630 if "lat" in h: 631 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 632 else: 633 _ = read_stat + write_stat 634 aggregate_results[h] = "{0:.3f}".format(_) 635 636 rows.add(",".join([job_name, *aggregate_results.values()])) 637 638 # Save results to file 639 for row in rows: 640 with open(os.path.join(results_dir, csv_file), "a") as fh: 641 fh.write(row + "\n") 642 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 643 644 def measure_sar(self, results_dir, sar_file_prefix): 645 cpu_number = os.cpu_count() 646 sar_idle_sum = 0 647 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 648 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 649 650 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 651 time.sleep(self.sar_delay) 652 self.log_print("Starting SAR measurements") 653 654 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 655 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 656 for line in out.split("\n"): 657 if "Average" in line: 658 if "CPU" in line: 659 self.log_print("Summary CPU utilization from SAR:") 660 self.log_print(line) 661 elif "all" in line: 662 self.log_print(line) 663 else: 664 sar_idle_sum += float(line.split()[7]) 665 fh.write(out) 666 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 667 668 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 669 f.write("%0.2f" % sar_cpu_usage) 670 671 def measure_power(self, results_dir, prefix, script_full_dir): 672 return subprocess.Popen(["%s/../pm/collect-bmc-pm" % script_full_dir, "-d", results_dir, "-l", "-p", prefix, "-x"]) 673 674 def ethtool_after_fio_ramp(self, fio_ramp_time): 675 time.sleep(fio_ramp_time//2) 676 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 677 for nic in nic_names: 678 self.log_print(nic) 679 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 680 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 681 682 def measure_pcm_memory(self, results_dir, pcm_file_name): 683 time.sleep(self.pcm_delay) 684 cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 685 pcm_memory = subprocess.Popen(cmd) 686 time.sleep(self.pcm_count) 687 pcm_memory.terminate() 688 689 def measure_pcm(self, results_dir, pcm_file_name): 690 time.sleep(self.pcm_delay) 691 cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, 692 "-csv=%s/%s" % (results_dir, pcm_file_name)] 693 subprocess.run(cmd) 694 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 695 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 696 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 697 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 698 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 699 700 def measure_pcm_power(self, results_dir, pcm_power_file_name): 701 time.sleep(self.pcm_delay) 702 out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 703 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 704 fh.write(out) 705 706 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 707 self.log_print("INFO: starting network bandwidth measure") 708 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 709 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 710 711 def measure_dpdk_memory(self, results_dir): 712 self.log_print("INFO: waiting to generate DPDK memory usage") 713 time.sleep(self.dpdk_wait_time) 714 self.log_print("INFO: generating DPDK memory usage") 715 rpc.env.env_dpdk_get_mem_stats 716 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 717 718 def sys_config(self): 719 self.log_print("====Kernel release:====") 720 self.log_print(os.uname().release) 721 self.log_print("====Kernel command line:====") 722 with open('/proc/cmdline') as f: 723 cmdline = f.readlines() 724 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 725 self.log_print("====sysctl conf:====") 726 with open('/etc/sysctl.conf') as f: 727 sysctl = f.readlines() 728 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 729 self.log_print("====Cpu power info:====") 730 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 731 self.log_print("====zcopy settings:====") 732 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 733 self.log_print("====Scheduler settings:====") 734 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 735 736 737class Initiator(Server): 738 def __init__(self, name, general_config, initiator_config): 739 super().__init__(name, general_config, initiator_config) 740 741 # Required fields 742 self.ip = initiator_config["ip"] 743 self.target_nic_ips = initiator_config["target_nic_ips"] 744 745 # Defaults 746 self.cpus_allowed = None 747 self.cpus_allowed_policy = "shared" 748 self.spdk_dir = "/tmp/spdk" 749 self.fio_bin = "/usr/src/fio/fio" 750 self.nvmecli_bin = "nvme" 751 self.cpu_frequency = None 752 self.subsystem_info_list = [] 753 754 if "spdk_dir" in initiator_config: 755 self.spdk_dir = initiator_config["spdk_dir"] 756 if "fio_bin" in initiator_config: 757 self.fio_bin = initiator_config["fio_bin"] 758 if "nvmecli_bin" in initiator_config: 759 self.nvmecli_bin = initiator_config["nvmecli_bin"] 760 if "cpus_allowed" in initiator_config: 761 self.cpus_allowed = initiator_config["cpus_allowed"] 762 if "cpus_allowed_policy" in initiator_config: 763 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 764 if "cpu_frequency" in initiator_config: 765 self.cpu_frequency = initiator_config["cpu_frequency"] 766 if os.getenv('SPDK_WORKSPACE'): 767 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 768 769 self.ssh_connection = paramiko.SSHClient() 770 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 771 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 772 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 773 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 774 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 775 776 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 777 self.copy_spdk("/tmp/spdk.zip") 778 self.set_local_nic_info(self.set_local_nic_info_helper()) 779 self.set_cpu_frequency() 780 self.configure_system() 781 if self.enable_adq: 782 self.configure_adq() 783 self.sys_config() 784 785 def set_local_nic_info_helper(self): 786 return json.loads(self.exec_cmd(["lshw", "-json"])) 787 788 def stop(self): 789 self.ssh_connection.close() 790 791 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 792 if change_dir: 793 cmd = ["cd", change_dir, ";", *cmd] 794 795 # In case one of the command elements contains whitespace and is not 796 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 797 # when sending to remote system. 798 for i, c in enumerate(cmd): 799 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 800 cmd[i] = '"%s"' % c 801 cmd = " ".join(cmd) 802 803 # Redirect stderr to stdout thanks using get_pty option if needed 804 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 805 out = stdout.read().decode(encoding="utf-8") 806 807 # Check the return code 808 rc = stdout.channel.recv_exit_status() 809 if rc: 810 raise CalledProcessError(int(rc), cmd, out) 811 812 return out 813 814 def put_file(self, local, remote_dest): 815 ftp = self.ssh_connection.open_sftp() 816 ftp.put(local, remote_dest) 817 ftp.close() 818 819 def get_file(self, remote, local_dest): 820 ftp = self.ssh_connection.open_sftp() 821 ftp.get(remote, local_dest) 822 ftp.close() 823 824 def copy_spdk(self, local_spdk_zip): 825 self.log_print("Copying SPDK sources to initiator %s" % self.name) 826 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 827 self.log_print("Copied sources zip from target") 828 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 829 self.log_print("Sources unpacked") 830 831 def copy_result_files(self, dest_dir): 832 self.log_print("Copying results") 833 834 if not os.path.exists(dest_dir): 835 os.mkdir(dest_dir) 836 837 # Get list of result files from initiator and copy them back to target 838 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 839 840 for file in file_list: 841 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 842 os.path.join(dest_dir, file)) 843 self.log_print("Done copying results") 844 845 def discover_subsystems(self, address_list, subsys_no): 846 num_nvmes = range(0, subsys_no) 847 nvme_discover_output = "" 848 for ip, subsys_no in itertools.product(address_list, num_nvmes): 849 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 850 nvme_discover_cmd = ["sudo", 851 "%s" % self.nvmecli_bin, 852 "discover", "-t", "%s" % self.transport, 853 "-s", "%s" % (4420 + subsys_no), 854 "-a", "%s" % ip] 855 856 try: 857 stdout = self.exec_cmd(nvme_discover_cmd) 858 if stdout: 859 nvme_discover_output = nvme_discover_output + stdout 860 except CalledProcessError: 861 # Do nothing. In case of discovering remote subsystems of kernel target 862 # we expect "nvme discover" to fail a bunch of times because we basically 863 # scan ports. 864 pass 865 866 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 867 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 868 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 869 nvme_discover_output) # from nvme discovery output 870 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 871 subsystems = filter(lambda x: "discovery" not in x[1], subsystems) 872 subsystems = list(set(subsystems)) 873 subsystems.sort(key=lambda x: x[1]) 874 self.log_print("Found matching subsystems on target side:") 875 for s in subsystems: 876 self.log_print(s) 877 self.subsystem_info_list = subsystems 878 879 def gen_fio_filename_conf(self, *args, **kwargs): 880 # Logic implemented in SPDKInitiator and KernelInitiator classes 881 pass 882 883 def get_route_nic_numa(self, remote_nvme_ip): 884 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 885 local_nvme_nic = local_nvme_nic[0]["dev"] 886 return self.get_nic_numa_node(local_nvme_nic) 887 888 @staticmethod 889 def gen_fio_offset_section(offset_inc, num_jobs): 890 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 891 return "\n".join(["size=%s%%" % offset_inc, 892 "offset=0%", 893 "offset_increment=%s%%" % offset_inc]) 894 895 def gen_fio_numa_section(self, fio_filenames_list): 896 numa_stats = {} 897 for nvme in fio_filenames_list: 898 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 899 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 900 901 # Use the most common NUMA node for this chunk to allocate memory and CPUs 902 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 903 904 return "\n".join(["numa_cpu_nodes=%s" % section_local_numa, 905 "numa_mem_policy=prefer:%s" % section_local_numa]) 906 907 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 908 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 909 offset=False, offset_inc=0): 910 fio_conf_template = """ 911[global] 912ioengine={ioengine} 913{spdk_conf} 914thread=1 915group_reporting=1 916direct=1 917percentile_list=50:90:99:99.5:99.9:99.99:99.999 918 919norandommap=1 920rw={rw} 921rwmixread={rwmixread} 922bs={block_size} 923time_based=1 924ramp_time={ramp_time} 925runtime={run_time} 926rate_iops={rate_iops} 927""" 928 if "spdk" in self.mode: 929 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 930 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 931 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 932 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 933 else: 934 ioengine = self.ioengine 935 spdk_conf = "" 936 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 937 "|", "awk", "'{print $1}'"]) 938 subsystems = [x for x in out.split("\n") if "nvme" in x] 939 940 if self.cpus_allowed is not None: 941 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 942 cpus_num = 0 943 cpus = self.cpus_allowed.split(",") 944 for cpu in cpus: 945 if "-" in cpu: 946 a, b = cpu.split("-") 947 a = int(a) 948 b = int(b) 949 cpus_num += len(range(a, b)) 950 else: 951 cpus_num += 1 952 self.num_cores = cpus_num 953 threads = range(0, self.num_cores) 954 elif hasattr(self, 'num_cores'): 955 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 956 threads = range(0, int(self.num_cores)) 957 else: 958 self.num_cores = len(subsystems) 959 threads = range(0, len(subsystems)) 960 961 if "spdk" in self.mode: 962 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 963 offset, offset_inc) 964 else: 965 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs, 966 offset, offset_inc) 967 968 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 969 rw=rw, rwmixread=rwmixread, block_size=block_size, 970 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 971 972 # TODO: hipri disabled for now, as it causes fio errors: 973 # io_u error on file /dev/nvme2n1: Operation not supported 974 # See comment in KernelInitiator class, kernel_init_connect() function 975 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 976 fio_config = fio_config + """ 977fixedbufs=1 978registerfiles=1 979#hipri=1 980""" 981 if num_jobs: 982 fio_config = fio_config + "numjobs=%s \n" % num_jobs 983 if self.cpus_allowed is not None: 984 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 985 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 986 fio_config = fio_config + filename_section 987 988 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 989 if hasattr(self, "num_cores"): 990 fio_config_filename += "_%sCPU" % self.num_cores 991 fio_config_filename += ".fio" 992 993 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 994 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 995 self.log_print("Created FIO Config:") 996 self.log_print(fio_config) 997 998 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 999 1000 def set_cpu_frequency(self): 1001 if self.cpu_frequency is not None: 1002 try: 1003 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 1004 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 1005 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 1006 except Exception: 1007 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 1008 sys.exit() 1009 else: 1010 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 1011 1012 def run_fio(self, fio_config_file, run_num=None): 1013 job_name, _ = os.path.splitext(fio_config_file) 1014 self.log_print("Starting FIO run for job: %s" % job_name) 1015 self.log_print("Using FIO: %s" % self.fio_bin) 1016 1017 if run_num: 1018 for i in range(1, run_num + 1): 1019 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 1020 try: 1021 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 1022 "--output=%s" % output_filename, "--eta=never"], True) 1023 self.log_print(output) 1024 except subprocess.CalledProcessError as e: 1025 self.log_print("ERROR: Fio process failed!") 1026 self.log_print(e.stdout) 1027 else: 1028 output_filename = job_name + "_" + self.name + ".json" 1029 output = self.exec_cmd(["sudo", self.fio_bin, 1030 fio_config_file, "--output-format=json", 1031 "--output" % output_filename], True) 1032 self.log_print(output) 1033 self.log_print("FIO run finished. Results in: %s" % output_filename) 1034 1035 def sys_config(self): 1036 self.log_print("====Kernel release:====") 1037 self.log_print(self.exec_cmd(["uname", "-r"])) 1038 self.log_print("====Kernel command line:====") 1039 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1040 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1041 self.log_print("====sysctl conf:====") 1042 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 1043 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1044 self.log_print("====Cpu power info:====") 1045 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 1046 1047 1048class KernelTarget(Target): 1049 def __init__(self, name, general_config, target_config): 1050 super().__init__(name, general_config, target_config) 1051 # Defaults 1052 self.nvmet_bin = "nvmetcli" 1053 1054 if "nvmet_bin" in target_config: 1055 self.nvmet_bin = target_config["nvmet_bin"] 1056 1057 def stop(self): 1058 nvmet_command(self.nvmet_bin, "clear") 1059 1060 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1061 1062 nvmet_cfg = { 1063 "ports": [], 1064 "hosts": [], 1065 "subsystems": [], 1066 } 1067 1068 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1069 port = str(4420 + bdev_num) 1070 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1071 serial = "SPDK00%s" % bdev_num 1072 bdev_name = nvme_list[bdev_num] 1073 1074 nvmet_cfg["subsystems"].append({ 1075 "allowed_hosts": [], 1076 "attr": { 1077 "allow_any_host": "1", 1078 "serial": serial, 1079 "version": "1.3" 1080 }, 1081 "namespaces": [ 1082 { 1083 "device": { 1084 "path": bdev_name, 1085 "uuid": "%s" % uuid.uuid4() 1086 }, 1087 "enable": 1, 1088 "nsid": port 1089 } 1090 ], 1091 "nqn": nqn 1092 }) 1093 1094 nvmet_cfg["ports"].append({ 1095 "addr": { 1096 "adrfam": "ipv4", 1097 "traddr": ip, 1098 "trsvcid": port, 1099 "trtype": self.transport 1100 }, 1101 "portid": bdev_num, 1102 "referrals": [], 1103 "subsystems": [nqn] 1104 }) 1105 1106 self.subsystem_info_list.append([port, nqn, ip]) 1107 self.subsys_no = len(self.subsystem_info_list) 1108 1109 with open("kernel.conf", "w") as fh: 1110 fh.write(json.dumps(nvmet_cfg, indent=2)) 1111 1112 def tgt_start(self): 1113 self.log_print("Configuring kernel NVMeOF Target") 1114 1115 if self.null_block: 1116 print("Configuring with null block device.") 1117 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1118 else: 1119 print("Configuring with NVMe drives.") 1120 nvme_list = get_nvme_devices() 1121 1122 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1123 self.subsys_no = len(nvme_list) 1124 1125 nvmet_command(self.nvmet_bin, "clear") 1126 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1127 1128 if self.enable_adq: 1129 self.adq_configure_tc() 1130 1131 self.log_print("Done configuring kernel NVMeOF Target") 1132 1133 1134class SPDKTarget(Target): 1135 def __init__(self, name, general_config, target_config): 1136 super().__init__(name, general_config, target_config) 1137 1138 # Required fields 1139 self.core_mask = target_config["core_mask"] 1140 self.num_cores = self.get_num_cores(self.core_mask) 1141 1142 # Defaults 1143 self.dif_insert_strip = False 1144 self.null_block_dif_type = 0 1145 self.num_shared_buffers = 4096 1146 self.max_queue_depth = 128 1147 self.bpf_proc = None 1148 self.bpf_scripts = [] 1149 self.enable_dsa = False 1150 self.scheduler_core_limit = None 1151 1152 if "num_shared_buffers" in target_config: 1153 self.num_shared_buffers = target_config["num_shared_buffers"] 1154 if "max_queue_depth" in target_config: 1155 self.max_queue_depth = target_config["max_queue_depth"] 1156 if "null_block_dif_type" in target_config: 1157 self.null_block_dif_type = target_config["null_block_dif_type"] 1158 if "dif_insert_strip" in target_config: 1159 self.dif_insert_strip = target_config["dif_insert_strip"] 1160 if "bpf_scripts" in target_config: 1161 self.bpf_scripts = target_config["bpf_scripts"] 1162 if "dsa_settings" in target_config: 1163 self.enable_dsa = target_config["dsa_settings"] 1164 if "scheduler_core_limit" in target_config: 1165 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1166 1167 self.log_print("====DSA settings:====") 1168 self.log_print("DSA enabled: %s" % (self.enable_dsa)) 1169 1170 @staticmethod 1171 def get_num_cores(core_mask): 1172 if "0x" in core_mask: 1173 return bin(int(core_mask, 16)).count("1") 1174 else: 1175 num_cores = 0 1176 core_mask = core_mask.replace("[", "") 1177 core_mask = core_mask.replace("]", "") 1178 for i in core_mask.split(","): 1179 if "-" in i: 1180 x, y = i.split("-") 1181 num_cores += len(range(int(x), int(y))) + 1 1182 else: 1183 num_cores += 1 1184 return num_cores 1185 1186 def spdk_tgt_configure(self): 1187 self.log_print("Configuring SPDK NVMeOF target via RPC") 1188 1189 if self.enable_adq: 1190 self.adq_configure_tc() 1191 1192 # Create transport layer 1193 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1194 num_shared_buffers=self.num_shared_buffers, 1195 max_queue_depth=self.max_queue_depth, 1196 dif_insert_or_strip=self.dif_insert_strip, 1197 sock_priority=self.adq_priority) 1198 self.log_print("SPDK NVMeOF transport layer:") 1199 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1200 1201 if self.null_block: 1202 self.spdk_tgt_add_nullblock(self.null_block) 1203 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1204 else: 1205 self.spdk_tgt_add_nvme_conf() 1206 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1207 1208 self.log_print("Done configuring SPDK NVMeOF Target") 1209 1210 def spdk_tgt_add_nullblock(self, null_block_count): 1211 md_size = 0 1212 block_size = 4096 1213 if self.null_block_dif_type != 0: 1214 md_size = 128 1215 1216 self.log_print("Adding null block bdevices to config via RPC") 1217 for i in range(null_block_count): 1218 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1219 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1220 dif_type=self.null_block_dif_type, md_size=md_size) 1221 self.log_print("SPDK Bdevs configuration:") 1222 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1223 1224 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1225 self.log_print("Adding NVMe bdevs to config via RPC") 1226 1227 bdfs = get_nvme_devices_bdf() 1228 bdfs = [b.replace(":", ".") for b in bdfs] 1229 1230 if req_num_disks: 1231 if req_num_disks > len(bdfs): 1232 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1233 sys.exit(1) 1234 else: 1235 bdfs = bdfs[0:req_num_disks] 1236 1237 for i, bdf in enumerate(bdfs): 1238 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1239 1240 self.log_print("SPDK Bdevs configuration:") 1241 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1242 1243 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1244 self.log_print("Adding subsystems to config") 1245 if not req_num_disks: 1246 req_num_disks = get_nvme_devices_count() 1247 1248 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1249 port = str(4420 + bdev_num) 1250 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1251 serial = "SPDK00%s" % bdev_num 1252 bdev_name = "Nvme%sn1" % bdev_num 1253 1254 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1255 allow_any_host=True, max_namespaces=8) 1256 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1257 for nqn_name in [nqn, "discovery"]: 1258 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1259 nqn=nqn_name, 1260 trtype=self.transport, 1261 traddr=ip, 1262 trsvcid=port, 1263 adrfam="ipv4") 1264 self.subsystem_info_list.append([port, nqn, ip]) 1265 self.subsys_no = len(self.subsystem_info_list) 1266 1267 self.log_print("SPDK NVMeOF subsystem configuration:") 1268 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1269 1270 def bpf_start(self): 1271 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1272 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1273 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1274 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1275 1276 with open(self.pid, "r") as fh: 1277 nvmf_pid = str(fh.readline()) 1278 1279 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1280 self.log_print(cmd) 1281 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1282 1283 def tgt_start(self): 1284 if self.null_block: 1285 self.subsys_no = 1 1286 else: 1287 self.subsys_no = get_nvme_devices_count() 1288 self.log_print("Starting SPDK NVMeOF Target process") 1289 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1290 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1291 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1292 1293 with open(self.pid, "w") as fh: 1294 fh.write(str(proc.pid)) 1295 self.nvmf_proc = proc 1296 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1297 self.log_print("Waiting for spdk to initialize...") 1298 while True: 1299 if os.path.exists("/var/tmp/spdk.sock"): 1300 break 1301 time.sleep(1) 1302 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1303 1304 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1305 1306 if self.enable_zcopy: 1307 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1308 enable_zerocopy_send_server=True) 1309 self.log_print("Target socket options:") 1310 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1311 1312 if self.enable_adq: 1313 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1314 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1315 nvme_adminq_poll_period_us=100000, retry_count=4) 1316 1317 if self.enable_dsa: 1318 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1319 self.log_print("Target DSA accel module enabled") 1320 1321 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1322 rpc.framework_start_init(self.client) 1323 1324 if self.bpf_scripts: 1325 self.bpf_start() 1326 1327 self.spdk_tgt_configure() 1328 1329 def stop(self): 1330 if self.bpf_proc: 1331 self.log_print("Stopping BPF Trace script") 1332 self.bpf_proc.terminate() 1333 self.bpf_proc.wait() 1334 1335 if hasattr(self, "nvmf_proc"): 1336 try: 1337 self.nvmf_proc.terminate() 1338 self.nvmf_proc.wait(timeout=30) 1339 except Exception as e: 1340 self.log_print("Failed to terminate SPDK Target process. Sending SIGKILL.") 1341 self.log_print(e) 1342 self.nvmf_proc.kill() 1343 self.nvmf_proc.communicate() 1344 # Try to clean up RPC socket files if they were not removed 1345 # because of using 'kill' 1346 try: 1347 os.remove("/var/tmp/spdk.sock") 1348 os.remove("/var/tmp/spdk.sock.lock") 1349 except FileNotFoundError: 1350 pass 1351 1352 1353class KernelInitiator(Initiator): 1354 def __init__(self, name, general_config, initiator_config): 1355 super().__init__(name, general_config, initiator_config) 1356 1357 # Defaults 1358 self.extra_params = "" 1359 self.ioengine = "libaio" 1360 1361 if "extra_params" in initiator_config: 1362 self.extra_params = initiator_config["extra_params"] 1363 1364 if "kernel_engine" in initiator_config: 1365 self.ioengine = initiator_config["kernel_engine"] 1366 if "io_uring" in self.ioengine: 1367 self.extra_params = "--nr-poll-queues=8" 1368 1369 def get_connected_nvme_list(self): 1370 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1371 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1372 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1373 return nvme_list 1374 1375 def kernel_init_connect(self): 1376 self.log_print("Below connection attempts may result in error messages, this is expected!") 1377 for subsystem in self.subsystem_info_list: 1378 self.log_print("Trying to connect %s %s %s" % subsystem) 1379 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1380 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1381 time.sleep(2) 1382 1383 if "io_uring" in self.ioengine: 1384 self.log_print("Setting block layer settings for io_uring.") 1385 1386 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1387 # apparently it's not possible for connected subsystems. 1388 # Results in "error: Invalid argument" 1389 block_sysfs_settings = { 1390 "iostats": "0", 1391 "rq_affinity": "0", 1392 "nomerges": "2" 1393 } 1394 1395 for disk in self.get_connected_nvme_list(): 1396 sysfs = os.path.join("/sys/block", disk, "queue") 1397 for k, v in block_sysfs_settings.items(): 1398 sysfs_opt_path = os.path.join(sysfs, k) 1399 try: 1400 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1401 except subprocess.CalledProcessError as e: 1402 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1403 finally: 1404 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1405 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1406 1407 def kernel_init_disconnect(self): 1408 for subsystem in self.subsystem_info_list: 1409 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1410 time.sleep(1) 1411 1412 def get_nvme_subsystem_numa(self, dev_name): 1413 # Remove two last characters to get controller name instead of subsystem name 1414 nvme_ctrl = os.path.basename(dev_name)[:-2] 1415 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1416 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1417 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1418 1419 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1420 # Generate connected nvme devices names and sort them by used NIC numa node 1421 # to allow better grouping when splitting into fio sections. 1422 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1423 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1424 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1425 1426 filename_section = "" 1427 nvme_per_split = int(len(nvme_list) / len(threads)) 1428 remainder = len(nvme_list) % len(threads) 1429 iterator = iter(nvme_list) 1430 result = [] 1431 for i in range(len(threads)): 1432 result.append([]) 1433 for _ in range(nvme_per_split): 1434 result[i].append(next(iterator)) 1435 if remainder: 1436 result[i].append(next(iterator)) 1437 remainder -= 1 1438 for i, r in enumerate(result): 1439 header = "[filename%s]" % i 1440 disks = "\n".join(["filename=%s" % x for x in r]) 1441 job_section_qd = round((io_depth * len(r)) / num_jobs) 1442 if job_section_qd == 0: 1443 job_section_qd = 1 1444 iodepth = "iodepth=%s" % job_section_qd 1445 1446 offset_section = "" 1447 if offset: 1448 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1449 1450 numa_opts = self.gen_fio_numa_section(r) 1451 1452 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1453 1454 return filename_section 1455 1456 1457class SPDKInitiator(Initiator): 1458 def __init__(self, name, general_config, initiator_config): 1459 super().__init__(name, general_config, initiator_config) 1460 1461 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1462 self.install_spdk() 1463 1464 # Required fields 1465 self.num_cores = initiator_config["num_cores"] 1466 1467 # Optional fields 1468 self.enable_data_digest = False 1469 if "enable_data_digest" in initiator_config: 1470 self.enable_data_digest = initiator_config["enable_data_digest"] 1471 1472 def install_spdk(self): 1473 self.log_print("Using fio binary %s" % self.fio_bin) 1474 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1475 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1476 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1477 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1478 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1479 1480 self.log_print("SPDK built") 1481 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1482 1483 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1484 bdev_cfg_section = { 1485 "subsystems": [ 1486 { 1487 "subsystem": "bdev", 1488 "config": [] 1489 } 1490 ] 1491 } 1492 1493 for i, subsys in enumerate(remote_subsystem_list): 1494 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1495 nvme_ctrl = { 1496 "method": "bdev_nvme_attach_controller", 1497 "params": { 1498 "name": "Nvme{}".format(i), 1499 "trtype": self.transport, 1500 "traddr": sub_addr, 1501 "trsvcid": sub_port, 1502 "subnqn": sub_nqn, 1503 "adrfam": "IPv4" 1504 } 1505 } 1506 1507 if self.enable_adq: 1508 nvme_ctrl["params"].update({"priority": "1"}) 1509 1510 if self.enable_data_digest: 1511 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1512 1513 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1514 1515 return json.dumps(bdev_cfg_section, indent=2) 1516 1517 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1518 filename_section = "" 1519 if len(threads) >= len(subsystems): 1520 threads = range(0, len(subsystems)) 1521 1522 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1523 # to allow better grouping when splitting into fio sections. 1524 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1525 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1526 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1527 1528 nvme_per_split = int(len(subsystems) / len(threads)) 1529 remainder = len(subsystems) % len(threads) 1530 iterator = iter(filenames) 1531 result = [] 1532 for i in range(len(threads)): 1533 result.append([]) 1534 for _ in range(nvme_per_split): 1535 result[i].append(next(iterator)) 1536 if remainder: 1537 result[i].append(next(iterator)) 1538 remainder -= 1 1539 for i, r in enumerate(result): 1540 header = "[filename%s]" % i 1541 disks = "\n".join(["filename=%s" % x for x in r]) 1542 job_section_qd = round((io_depth * len(r)) / num_jobs) 1543 if job_section_qd == 0: 1544 job_section_qd = 1 1545 iodepth = "iodepth=%s" % job_section_qd 1546 1547 offset_section = "" 1548 if offset: 1549 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1550 1551 numa_opts = self.gen_fio_numa_section(r) 1552 1553 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1554 1555 return filename_section 1556 1557 def get_nvme_subsystem_numa(self, bdev_name): 1558 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1559 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1560 1561 # Remove two last characters to get controller name instead of subsystem name 1562 nvme_ctrl = bdev_name[:-2] 1563 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1564 return self.get_route_nic_numa(remote_nvme_ip) 1565 1566 1567if __name__ == "__main__": 1568 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1569 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1570 1571 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1572 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1573 help='Configuration file.') 1574 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1575 help='Results directory.') 1576 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1577 help='CSV results filename.') 1578 1579 args = parser.parse_args() 1580 1581 print("Using config file: %s" % args.config) 1582 with open(args.config, "r") as config: 1583 data = json.load(config) 1584 1585 initiators = [] 1586 fio_cases = [] 1587 1588 general_config = data["general"] 1589 target_config = data["target"] 1590 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1591 1592 for k, v in data.items(): 1593 if "target" in k: 1594 v.update({"results_dir": args.results}) 1595 if data[k]["mode"] == "spdk": 1596 target_obj = SPDKTarget(k, data["general"], v) 1597 elif data[k]["mode"] == "kernel": 1598 target_obj = KernelTarget(k, data["general"], v) 1599 elif "initiator" in k: 1600 if data[k]["mode"] == "spdk": 1601 init_obj = SPDKInitiator(k, data["general"], v) 1602 elif data[k]["mode"] == "kernel": 1603 init_obj = KernelInitiator(k, data["general"], v) 1604 initiators.append(init_obj) 1605 elif "fio" in k: 1606 fio_workloads = itertools.product(data[k]["bs"], 1607 data[k]["qd"], 1608 data[k]["rw"]) 1609 1610 fio_run_time = data[k]["run_time"] 1611 fio_ramp_time = data[k]["ramp_time"] 1612 fio_rw_mix_read = data[k]["rwmixread"] 1613 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1614 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1615 1616 fio_rate_iops = 0 1617 if "rate_iops" in data[k]: 1618 fio_rate_iops = data[k]["rate_iops"] 1619 1620 fio_offset = False 1621 if "offset" in data[k]: 1622 fio_offset = data[k]["offset"] 1623 fio_offset_inc = 0 1624 if "offset_inc" in data[k]: 1625 fio_offset_inc = data[k]["offset_inc"] 1626 else: 1627 continue 1628 1629 try: 1630 os.mkdir(args.results) 1631 except FileExistsError: 1632 pass 1633 1634 for i in initiators: 1635 target_obj.initiator_info.append( 1636 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1637 ) 1638 1639 # TODO: This try block is definietly too large. Need to break this up into separate 1640 # logical blocks to reduce size. 1641 try: 1642 target_obj.tgt_start() 1643 1644 for i in initiators: 1645 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1646 if i.enable_adq: 1647 i.adq_configure_tc() 1648 1649 # Poor mans threading 1650 # Run FIO tests 1651 for block_size, io_depth, rw in fio_workloads: 1652 threads = [] 1653 configs = [] 1654 power_daemon = None 1655 for i in initiators: 1656 if i.mode == "kernel": 1657 i.kernel_init_connect() 1658 1659 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1660 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1661 fio_offset, fio_offset_inc) 1662 configs.append(cfg) 1663 1664 for i, cfg in zip(initiators, configs): 1665 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1666 threads.append(t) 1667 if target_obj.enable_sar: 1668 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1669 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1670 threads.append(t) 1671 1672 if target_obj.enable_pcm: 1673 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1674 1675 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1676 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1677 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1678 1679 threads.append(pcm_cpu_t) 1680 threads.append(pcm_mem_t) 1681 threads.append(pcm_pow_t) 1682 1683 if target_obj.enable_bandwidth: 1684 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1685 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1686 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1687 threads.append(t) 1688 1689 if target_obj.enable_dpdk_memory: 1690 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1691 threads.append(t) 1692 1693 if target_obj.enable_adq: 1694 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1695 threads.append(ethtool_thread) 1696 1697 if target_obj.enable_pm: 1698 power_daemon = target_obj.measure_power(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir) 1699 1700 for t in threads: 1701 t.start() 1702 for t in threads: 1703 t.join() 1704 1705 for i in initiators: 1706 if i.mode == "kernel": 1707 i.kernel_init_disconnect() 1708 i.copy_result_files(args.results) 1709 1710 if power_daemon: 1711 power_daemon.terminate() 1712 1713 target_obj.restore_governor() 1714 target_obj.restore_tuned() 1715 target_obj.restore_services() 1716 target_obj.restore_sysctl() 1717 if target_obj.enable_adq: 1718 target_obj.reload_driver("ice") 1719 for i in initiators: 1720 i.restore_governor() 1721 i.restore_tuned() 1722 i.restore_services() 1723 i.restore_sysctl() 1724 if i.enable_adq: 1725 i.reload_driver("ice") 1726 target_obj.parse_results(args.results, args.csv_filename) 1727 finally: 1728 for i in initiators: 1729 try: 1730 i.stop() 1731 except Exception as err: 1732 pass 1733 target_obj.stop() 1734