1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20 21import rpc 22import rpc.client 23from common import * 24 25 26class Server: 27 def __init__(self, name, general_config, server_config): 28 self.name = name 29 self.username = general_config["username"] 30 self.password = general_config["password"] 31 self.transport = general_config["transport"].lower() 32 self.nic_ips = server_config["nic_ips"] 33 self.mode = server_config["mode"] 34 35 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 36 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 37 self.irq_scripts_dir = server_config["irq_scripts_dir"] 38 39 self.local_nic_info = [] 40 self._nics_json_obj = {} 41 self.svc_restore_dict = {} 42 self.sysctl_restore_dict = {} 43 self.tuned_restore_dict = {} 44 self.governor_restore = "" 45 self.tuned_profile = "" 46 47 self.enable_adq = False 48 self.adq_priority = None 49 if "adq_enable" in server_config and server_config["adq_enable"]: 50 self.enable_adq = server_config["adq_enable"] 51 self.adq_priority = 1 52 53 if "tuned_profile" in server_config: 54 self.tuned_profile = server_config["tuned_profile"] 55 56 if not re.match("^[A-Za-z0-9]*$", name): 57 self.log_print("Please use a name which contains only letters or numbers") 58 sys.exit(1) 59 60 def log_print(self, msg): 61 print("[%s] %s" % (self.name, msg), flush=True) 62 63 def get_uncommented_lines(self, lines): 64 return [line for line in lines if line and not line.startswith('#')] 65 66 def get_nic_name_by_ip(self, ip): 67 if not self._nics_json_obj: 68 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 69 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 70 for nic in self._nics_json_obj: 71 for addr in nic["addr_info"]: 72 if ip in addr["local"]: 73 return nic["ifname"] 74 75 def set_local_nic_info_helper(self): 76 pass 77 78 def set_local_nic_info(self, pci_info): 79 def extract_network_elements(json_obj): 80 nic_list = [] 81 if isinstance(json_obj, list): 82 for x in json_obj: 83 nic_list.extend(extract_network_elements(x)) 84 elif isinstance(json_obj, dict): 85 if "children" in json_obj: 86 nic_list.extend(extract_network_elements(json_obj["children"])) 87 if "class" in json_obj.keys() and "network" in json_obj["class"]: 88 nic_list.append(json_obj) 89 return nic_list 90 91 self.local_nic_info = extract_network_elements(pci_info) 92 93 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 94 return "" 95 96 def configure_system(self): 97 self.configure_services() 98 self.configure_sysctl() 99 self.configure_tuned() 100 self.configure_cpu_governor() 101 self.configure_irq_affinity() 102 103 def configure_adq(self): 104 if self.mode == "kernel": 105 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 106 return 107 self.adq_load_modules() 108 self.adq_configure_nic() 109 110 def adq_load_modules(self): 111 self.log_print("Modprobing ADQ-related Linux modules...") 112 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 113 for module in adq_module_deps: 114 try: 115 self.exec_cmd(["sudo", "modprobe", module]) 116 self.log_print("%s loaded!" % module) 117 except CalledProcessError as e: 118 self.log_print("ERROR: failed to load module %s" % module) 119 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 120 121 def adq_configure_tc(self): 122 self.log_print("Configuring ADQ Traffic classess and filters...") 123 124 if self.mode == "kernel": 125 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 126 return 127 128 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 129 num_queues_tc1 = self.num_cores 130 port_param = "dst_port" if isinstance(self, Target) else "src_port" 131 ports = set([p[0] for p in self.subsystem_info_list]) 132 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 133 134 for nic_ip in self.nic_ips: 135 nic_name = self.get_nic_name_by_ip(nic_ip) 136 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 137 "root", "mqprio", "num_tc", "2", "map", "0", "1", 138 "queues", "%s@0" % num_queues_tc0, 139 "%s@%s" % (num_queues_tc1, num_queues_tc0), 140 "hw", "1", "mode", "channel"] 141 self.log_print(" ".join(tc_qdisc_map_cmd)) 142 self.exec_cmd(tc_qdisc_map_cmd) 143 144 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 145 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 146 self.exec_cmd(tc_qdisc_ingress_cmd) 147 148 for port in ports: 149 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 150 "protocol", "ip", "ingress", "prio", "1", "flower", 151 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 152 "skip_sw", "hw_tc", "1"] 153 self.log_print(" ".join(tc_filter_cmd)) 154 self.exec_cmd(tc_filter_cmd) 155 156 # Ethtool coalese settings must be applied after configuring traffic classes 157 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 158 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 159 160 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 161 xps_cmd = ["sudo", xps_script_path, nic_name] 162 self.log_print(xps_cmd) 163 self.exec_cmd(xps_cmd) 164 165 def adq_configure_nic(self): 166 self.log_print("Configuring NIC port settings for ADQ testing...") 167 168 # Reload the driver first, to make sure any previous settings are re-set. 169 try: 170 self.exec_cmd(["sudo", "rmmod", "ice"]) 171 self.exec_cmd(["sudo", "modprobe", "ice"]) 172 except CalledProcessError as e: 173 self.log_print("ERROR: failed to reload ice module!") 174 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 175 176 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 177 for nic in nic_names: 178 self.log_print(nic) 179 try: 180 self.exec_cmd(["sudo", "ethtool", "-K", nic, 181 "hw-tc-offload", "on"]) # Enable hardware TC offload 182 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 183 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 184 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 185 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 186 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 187 except CalledProcessError as e: 188 self.log_print("ERROR: failed to configure NIC port using ethtool!") 189 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 190 self.log_print("Please update your NIC driver and firmware versions and try again.") 191 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 192 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 193 194 def configure_services(self): 195 self.log_print("Configuring active services...") 196 svc_config = configparser.ConfigParser(strict=False) 197 198 # Below list is valid only for RHEL / Fedora systems and might not 199 # contain valid names for other distributions. 200 svc_target_state = { 201 "firewalld": "inactive", 202 "irqbalance": "inactive", 203 "lldpad.service": "inactive", 204 "lldpad.socket": "inactive" 205 } 206 207 for service in svc_target_state: 208 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 209 out = "\n".join(["[%s]" % service, out]) 210 svc_config.read_string(out) 211 212 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 213 continue 214 215 service_state = svc_config[service]["ActiveState"] 216 self.log_print("Current state of %s service is %s" % (service, service_state)) 217 self.svc_restore_dict.update({service: service_state}) 218 if service_state != "inactive": 219 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 220 self.exec_cmd(["sudo", "systemctl", "stop", service]) 221 222 def configure_sysctl(self): 223 self.log_print("Tuning sysctl settings...") 224 225 busy_read = 0 226 if self.enable_adq and self.mode == "spdk": 227 busy_read = 1 228 229 sysctl_opts = { 230 "net.core.busy_poll": 0, 231 "net.core.busy_read": busy_read, 232 "net.core.somaxconn": 4096, 233 "net.core.netdev_max_backlog": 8192, 234 "net.ipv4.tcp_max_syn_backlog": 16384, 235 "net.core.rmem_max": 268435456, 236 "net.core.wmem_max": 268435456, 237 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 238 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 239 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 240 "net.ipv4.route.flush": 1, 241 "vm.overcommit_memory": 1, 242 } 243 244 for opt, value in sysctl_opts.items(): 245 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 246 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 247 248 def configure_tuned(self): 249 if not self.tuned_profile: 250 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 251 return 252 253 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 254 service = "tuned" 255 tuned_config = configparser.ConfigParser(strict=False) 256 257 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 258 out = "\n".join(["[%s]" % service, out]) 259 tuned_config.read_string(out) 260 tuned_state = tuned_config[service]["ActiveState"] 261 self.svc_restore_dict.update({service: tuned_state}) 262 263 if tuned_state != "inactive": 264 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 265 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 266 267 self.tuned_restore_dict = { 268 "profile": profile, 269 "mode": profile_mode 270 } 271 272 self.exec_cmd(["sudo", "systemctl", "start", service]) 273 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 274 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 275 276 def configure_cpu_governor(self): 277 self.log_print("Setting CPU governor to performance...") 278 279 # This assumes that there is the same CPU scaling governor on each CPU 280 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 281 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 282 283 def configure_irq_affinity(self): 284 self.log_print("Setting NIC irq affinity for NICs...") 285 286 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 287 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 288 for nic in nic_names: 289 irq_cmd = ["sudo", irq_script_path, nic] 290 self.log_print(irq_cmd) 291 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 292 293 def restore_services(self): 294 self.log_print("Restoring services...") 295 for service, state in self.svc_restore_dict.items(): 296 cmd = "stop" if state == "inactive" else "start" 297 self.exec_cmd(["sudo", "systemctl", cmd, service]) 298 299 def restore_sysctl(self): 300 self.log_print("Restoring sysctl settings...") 301 for opt, value in self.sysctl_restore_dict.items(): 302 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 303 304 def restore_tuned(self): 305 self.log_print("Restoring tuned-adm settings...") 306 307 if not self.tuned_restore_dict: 308 return 309 310 if self.tuned_restore_dict["mode"] == "auto": 311 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 312 self.log_print("Reverted tuned-adm to auto_profile.") 313 else: 314 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 315 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 316 317 def restore_governor(self): 318 self.log_print("Restoring CPU governor setting...") 319 if self.governor_restore: 320 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 321 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 322 323 324class Target(Server): 325 def __init__(self, name, general_config, target_config): 326 super(Target, self).__init__(name, general_config, target_config) 327 328 # Defaults 329 self.enable_sar = False 330 self.sar_delay = 0 331 self.sar_interval = 0 332 self.sar_count = 0 333 self.enable_pcm = False 334 self.pcm_dir = "" 335 self.pcm_delay = 0 336 self.pcm_interval = 0 337 self.pcm_count = 0 338 self.enable_bandwidth = 0 339 self.bandwidth_count = 0 340 self.enable_dpdk_memory = False 341 self.dpdk_wait_time = 0 342 self.enable_zcopy = False 343 self.scheduler_name = "static" 344 self.null_block = 0 345 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 346 self.subsystem_info_list = [] 347 348 if "null_block_devices" in target_config: 349 self.null_block = target_config["null_block_devices"] 350 if "sar_settings" in target_config: 351 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 352 if "pcm_settings" in target_config: 353 self.enable_pcm = True 354 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 355 if "enable_bandwidth" in target_config: 356 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 357 if "enable_dpdk_memory" in target_config: 358 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 359 if "scheduler_settings" in target_config: 360 self.scheduler_name = target_config["scheduler_settings"] 361 if "zcopy_settings" in target_config: 362 self.enable_zcopy = target_config["zcopy_settings"] 363 if "results_dir" in target_config: 364 self.results_dir = target_config["results_dir"] 365 366 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 367 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 368 self.set_local_nic_info(self.set_local_nic_info_helper()) 369 370 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 371 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 372 373 self.configure_system() 374 if self.enable_adq: 375 self.configure_adq() 376 self.sys_config() 377 378 def set_local_nic_info_helper(self): 379 return json.loads(self.exec_cmd(["lshw", "-json"])) 380 381 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 382 stderr_opt = None 383 if stderr_redirect: 384 stderr_opt = subprocess.STDOUT 385 if change_dir: 386 old_cwd = os.getcwd() 387 os.chdir(change_dir) 388 self.log_print("Changing directory to %s" % change_dir) 389 390 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 391 392 if change_dir: 393 os.chdir(old_cwd) 394 self.log_print("Changing directory to %s" % old_cwd) 395 return out 396 397 def zip_spdk_sources(self, spdk_dir, dest_file): 398 self.log_print("Zipping SPDK source directory") 399 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 400 for root, directories, files in os.walk(spdk_dir, followlinks=True): 401 for file in files: 402 fh.write(os.path.relpath(os.path.join(root, file))) 403 fh.close() 404 self.log_print("Done zipping") 405 406 def read_json_stats(self, file): 407 with open(file, "r") as json_data: 408 data = json.load(json_data) 409 job_pos = 0 # job_post = 0 because using aggregated results 410 411 # Check if latency is in nano or microseconds to choose correct dict key 412 def get_lat_unit(key_prefix, dict_section): 413 # key prefix - lat, clat or slat. 414 # dict section - portion of json containing latency bucket in question 415 # Return dict key to access the bucket and unit as string 416 for k, _ in dict_section.items(): 417 if k.startswith(key_prefix): 418 return k, k.split("_")[1] 419 420 def get_clat_percentiles(clat_dict_leaf): 421 if "percentile" in clat_dict_leaf: 422 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 423 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 424 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 425 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 426 427 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 428 else: 429 # Latest fio versions do not provide "percentile" results if no 430 # measurements were done, so just return zeroes 431 return [0, 0, 0, 0] 432 433 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 434 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 435 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 436 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 437 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 438 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 439 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 440 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 441 data["jobs"][job_pos]["read"][clat_key]) 442 443 if "ns" in lat_unit: 444 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 445 if "ns" in clat_unit: 446 read_p99_lat = read_p99_lat / 1000 447 read_p99_9_lat = read_p99_9_lat / 1000 448 read_p99_99_lat = read_p99_99_lat / 1000 449 read_p99_999_lat = read_p99_999_lat / 1000 450 451 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 452 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 453 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 454 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 455 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 456 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 457 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 458 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 459 data["jobs"][job_pos]["write"][clat_key]) 460 461 if "ns" in lat_unit: 462 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 463 if "ns" in clat_unit: 464 write_p99_lat = write_p99_lat / 1000 465 write_p99_9_lat = write_p99_9_lat / 1000 466 write_p99_99_lat = write_p99_99_lat / 1000 467 write_p99_999_lat = write_p99_999_lat / 1000 468 469 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 470 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 471 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 472 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 473 474 def parse_results(self, results_dir, csv_file): 475 files = os.listdir(results_dir) 476 fio_files = filter(lambda x: ".fio" in x, files) 477 json_files = [x for x in files if ".json" in x] 478 479 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 480 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 481 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 482 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 483 484 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 485 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 486 487 header_line = ",".join(["Name", *headers]) 488 aggr_header_line = ",".join(["Name", *aggr_headers]) 489 490 # Create empty results file 491 with open(os.path.join(results_dir, csv_file), "w") as fh: 492 fh.write(aggr_header_line + "\n") 493 rows = set() 494 495 for fio_config in fio_files: 496 self.log_print("Getting FIO stats for %s" % fio_config) 497 job_name, _ = os.path.splitext(fio_config) 498 499 # Look in the filename for rwmixread value. Function arguments do 500 # not have that information. 501 # TODO: Improve this function by directly using workload params instead 502 # of regexing through filenames. 503 if "read" in job_name: 504 rw_mixread = 1 505 elif "write" in job_name: 506 rw_mixread = 0 507 else: 508 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 509 510 # If "_CPU" exists in name - ignore it 511 # Initiators for the same job could have diffrent num_cores parameter 512 job_name = re.sub(r"_\d+CPU", "", job_name) 513 job_result_files = [x for x in json_files if job_name in x] 514 self.log_print("Matching result files for current fio config:") 515 for j in job_result_files: 516 self.log_print("\t %s" % j) 517 518 # There may have been more than 1 initiator used in test, need to check that 519 # Result files are created so that string after last "_" separator is server name 520 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 521 inits_avg_results = [] 522 for i in inits_names: 523 self.log_print("\tGetting stats for initiator %s" % i) 524 # There may have been more than 1 test run for this job, calculate average results for initiator 525 i_results = [x for x in job_result_files if i in x] 526 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 527 528 separate_stats = [] 529 for r in i_results: 530 try: 531 stats = self.read_json_stats(os.path.join(results_dir, r)) 532 separate_stats.append(stats) 533 self.log_print(stats) 534 except JSONDecodeError as e: 535 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!") 536 537 init_results = [sum(x) for x in zip(*separate_stats)] 538 init_results = [x / len(separate_stats) for x in init_results] 539 inits_avg_results.append(init_results) 540 541 self.log_print("\tAverage results for initiator %s" % i) 542 self.log_print(init_results) 543 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 544 fh.write(header_line + "\n") 545 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 546 547 # Sum results of all initiators running this FIO job. 548 # Latency results are an average of latencies from accros all initiators. 549 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 550 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 551 for key in inits_avg_results: 552 if "lat" in key: 553 inits_avg_results[key] /= len(inits_names) 554 555 # Aggregate separate read/write values into common labels 556 # Take rw_mixread into consideration for mixed read/write workloads. 557 aggregate_results = OrderedDict() 558 for h in aggr_headers: 559 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 560 if "lat" in h: 561 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 562 else: 563 _ = read_stat + write_stat 564 aggregate_results[h] = "{0:.3f}".format(_) 565 566 rows.add(",".join([job_name, *aggregate_results.values()])) 567 568 # Save results to file 569 for row in rows: 570 with open(os.path.join(results_dir, csv_file), "a") as fh: 571 fh.write(row + "\n") 572 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 573 574 def measure_sar(self, results_dir, sar_file_name): 575 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 576 time.sleep(self.sar_delay) 577 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 578 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 579 for line in out.split("\n"): 580 if "Average" in line and "CPU" in line: 581 self.log_print("Summary CPU utilization from SAR:") 582 self.log_print(line) 583 if "Average" in line and "all" in line: 584 self.log_print(line) 585 fh.write(out) 586 587 def measure_pcm_memory(self, results_dir, pcm_file_name): 588 time.sleep(self.pcm_delay) 589 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 590 pcm_memory = subprocess.Popen(cmd) 591 time.sleep(self.pcm_count) 592 pcm_memory.terminate() 593 594 def measure_pcm(self, results_dir, pcm_file_name): 595 time.sleep(self.pcm_delay) 596 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 597 subprocess.run(cmd) 598 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 599 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 600 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 601 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 602 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 603 604 def measure_pcm_power(self, results_dir, pcm_power_file_name): 605 time.sleep(self.pcm_delay) 606 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 607 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 608 fh.write(out) 609 610 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 611 self.log_print("INFO: starting network bandwidth measure") 612 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 613 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 614 615 def measure_dpdk_memory(self, results_dir): 616 self.log_print("INFO: waiting to generate DPDK memory usage") 617 time.sleep(self.dpdk_wait_time) 618 self.log_print("INFO: generating DPDK memory usage") 619 rpc.env.env_dpdk_get_mem_stats 620 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 621 622 def sys_config(self): 623 self.log_print("====Kernel release:====") 624 self.log_print(os.uname().release) 625 self.log_print("====Kernel command line:====") 626 with open('/proc/cmdline') as f: 627 cmdline = f.readlines() 628 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 629 self.log_print("====sysctl conf:====") 630 with open('/etc/sysctl.conf') as f: 631 sysctl = f.readlines() 632 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 633 self.log_print("====Cpu power info:====") 634 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 635 self.log_print("====zcopy settings:====") 636 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 637 self.log_print("====Scheduler settings:====") 638 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 639 640 641class Initiator(Server): 642 def __init__(self, name, general_config, initiator_config): 643 super(Initiator, self).__init__(name, general_config, initiator_config) 644 645 # Required fields 646 self.ip = initiator_config["ip"] 647 self.target_nic_ips = initiator_config["target_nic_ips"] 648 649 # Defaults 650 self.cpus_allowed = None 651 self.cpus_allowed_policy = "shared" 652 self.spdk_dir = "/tmp/spdk" 653 self.fio_bin = "/usr/src/fio/fio" 654 self.nvmecli_bin = "nvme" 655 self.cpu_frequency = None 656 self.subsystem_info_list = [] 657 658 if "spdk_dir" in initiator_config: 659 self.spdk_dir = initiator_config["spdk_dir"] 660 if "fio_bin" in initiator_config: 661 self.fio_bin = initiator_config["fio_bin"] 662 if "nvmecli_bin" in initiator_config: 663 self.nvmecli_bin = initiator_config["nvmecli_bin"] 664 if "cpus_allowed" in initiator_config: 665 self.cpus_allowed = initiator_config["cpus_allowed"] 666 if "cpus_allowed_policy" in initiator_config: 667 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 668 if "cpu_frequency" in initiator_config: 669 self.cpu_frequency = initiator_config["cpu_frequency"] 670 if os.getenv('SPDK_WORKSPACE'): 671 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 672 673 self.ssh_connection = paramiko.SSHClient() 674 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 675 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 676 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 677 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 678 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 679 680 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 681 self.copy_spdk("/tmp/spdk.zip") 682 self.set_local_nic_info(self.set_local_nic_info_helper()) 683 self.set_cpu_frequency() 684 self.configure_system() 685 if self.enable_adq: 686 self.configure_adq() 687 self.sys_config() 688 689 def set_local_nic_info_helper(self): 690 return json.loads(self.exec_cmd(["lshw", "-json"])) 691 692 def __del__(self): 693 self.ssh_connection.close() 694 695 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 696 if change_dir: 697 cmd = ["cd", change_dir, ";", *cmd] 698 699 # In case one of the command elements contains whitespace and is not 700 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 701 # when sending to remote system. 702 for i, c in enumerate(cmd): 703 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 704 cmd[i] = '"%s"' % c 705 cmd = " ".join(cmd) 706 707 # Redirect stderr to stdout thanks using get_pty option if needed 708 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 709 out = stdout.read().decode(encoding="utf-8") 710 711 # Check the return code 712 rc = stdout.channel.recv_exit_status() 713 if rc: 714 raise CalledProcessError(int(rc), cmd, out) 715 716 return out 717 718 def put_file(self, local, remote_dest): 719 ftp = self.ssh_connection.open_sftp() 720 ftp.put(local, remote_dest) 721 ftp.close() 722 723 def get_file(self, remote, local_dest): 724 ftp = self.ssh_connection.open_sftp() 725 ftp.get(remote, local_dest) 726 ftp.close() 727 728 def copy_spdk(self, local_spdk_zip): 729 self.log_print("Copying SPDK sources to initiator %s" % self.name) 730 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 731 self.log_print("Copied sources zip from target") 732 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 733 self.log_print("Sources unpacked") 734 735 def copy_result_files(self, dest_dir): 736 self.log_print("Copying results") 737 738 if not os.path.exists(dest_dir): 739 os.mkdir(dest_dir) 740 741 # Get list of result files from initiator and copy them back to target 742 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 743 744 for file in file_list: 745 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 746 os.path.join(dest_dir, file)) 747 self.log_print("Done copying results") 748 749 def discover_subsystems(self, address_list, subsys_no): 750 num_nvmes = range(0, subsys_no) 751 nvme_discover_output = "" 752 for ip, subsys_no in itertools.product(address_list, num_nvmes): 753 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 754 nvme_discover_cmd = ["sudo", 755 "%s" % self.nvmecli_bin, 756 "discover", "-t", "%s" % self.transport, 757 "-s", "%s" % (4420 + subsys_no), 758 "-a", "%s" % ip] 759 760 try: 761 stdout = self.exec_cmd(nvme_discover_cmd) 762 if stdout: 763 nvme_discover_output = nvme_discover_output + stdout 764 except CalledProcessError: 765 # Do nothing. In case of discovering remote subsystems of kernel target 766 # we expect "nvme discover" to fail a bunch of times because we basically 767 # scan ports. 768 pass 769 770 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 771 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 772 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 773 nvme_discover_output) # from nvme discovery output 774 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 775 subsystems = list(set(subsystems)) 776 subsystems.sort(key=lambda x: x[1]) 777 self.log_print("Found matching subsystems on target side:") 778 for s in subsystems: 779 self.log_print(s) 780 self.subsystem_info_list = subsystems 781 782 def gen_fio_filename_conf(self, *args, **kwargs): 783 # Logic implemented in SPDKInitiator and KernelInitiator classes 784 pass 785 786 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 787 fio_conf_template = """ 788[global] 789ioengine={ioengine} 790{spdk_conf} 791thread=1 792group_reporting=1 793direct=1 794percentile_list=50:90:99:99.5:99.9:99.99:99.999 795 796norandommap=1 797rw={rw} 798rwmixread={rwmixread} 799bs={block_size} 800time_based=1 801ramp_time={ramp_time} 802runtime={run_time} 803rate_iops={rate_iops} 804""" 805 if "spdk" in self.mode: 806 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 807 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 808 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 809 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 810 else: 811 ioengine = self.ioengine 812 spdk_conf = "" 813 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 814 "|", "awk", "'{print $1}'"]) 815 subsystems = [x for x in out.split("\n") if "nvme" in x] 816 817 if self.cpus_allowed is not None: 818 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 819 cpus_num = 0 820 cpus = self.cpus_allowed.split(",") 821 for cpu in cpus: 822 if "-" in cpu: 823 a, b = cpu.split("-") 824 a = int(a) 825 b = int(b) 826 cpus_num += len(range(a, b)) 827 else: 828 cpus_num += 1 829 self.num_cores = cpus_num 830 threads = range(0, self.num_cores) 831 elif hasattr(self, 'num_cores'): 832 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 833 threads = range(0, int(self.num_cores)) 834 else: 835 self.num_cores = len(subsystems) 836 threads = range(0, len(subsystems)) 837 838 if "spdk" in self.mode: 839 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 840 else: 841 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 842 843 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 844 rw=rw, rwmixread=rwmixread, block_size=block_size, 845 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 846 847 # TODO: hipri disabled for now, as it causes fio errors: 848 # io_u error on file /dev/nvme2n1: Operation not supported 849 # See comment in KernelInitiator class, kernel_init_connect() function 850 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 851 fio_config = fio_config + """ 852fixedbufs=1 853registerfiles=1 854#hipri=1 855""" 856 if num_jobs: 857 fio_config = fio_config + "numjobs=%s \n" % num_jobs 858 if self.cpus_allowed is not None: 859 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 860 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 861 fio_config = fio_config + filename_section 862 863 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 864 if hasattr(self, "num_cores"): 865 fio_config_filename += "_%sCPU" % self.num_cores 866 fio_config_filename += ".fio" 867 868 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 869 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 870 self.log_print("Created FIO Config:") 871 self.log_print(fio_config) 872 873 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 874 875 def set_cpu_frequency(self): 876 if self.cpu_frequency is not None: 877 try: 878 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 879 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 880 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 881 except Exception: 882 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 883 sys.exit() 884 else: 885 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 886 887 def run_fio(self, fio_config_file, run_num=None): 888 job_name, _ = os.path.splitext(fio_config_file) 889 self.log_print("Starting FIO run for job: %s" % job_name) 890 self.log_print("Using FIO: %s" % self.fio_bin) 891 892 if run_num: 893 for i in range(1, run_num + 1): 894 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 895 try: 896 output = self.exec_cmd(["sudo", self.fio_bin, 897 fio_config_file, "--output-format=json", 898 "--output=%s" % output_filename], True) 899 self.log_print(output) 900 except subprocess.CalledProcessError as e: 901 self.log_print("ERROR: Fio process failed!") 902 self.log_print(e.stdout) 903 else: 904 output_filename = job_name + "_" + self.name + ".json" 905 output = self.exec_cmd(["sudo", self.fio_bin, 906 fio_config_file, "--output-format=json", 907 "--output" % output_filename], True) 908 self.log_print(output) 909 self.log_print("FIO run finished. Results in: %s" % output_filename) 910 911 def sys_config(self): 912 self.log_print("====Kernel release:====") 913 self.log_print(self.exec_cmd(["uname", "-r"])) 914 self.log_print("====Kernel command line:====") 915 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 916 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 917 self.log_print("====sysctl conf:====") 918 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 919 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 920 self.log_print("====Cpu power info:====") 921 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 922 923 924class KernelTarget(Target): 925 def __init__(self, name, general_config, target_config): 926 super(KernelTarget, self).__init__(name, general_config, target_config) 927 # Defaults 928 self.nvmet_bin = "nvmetcli" 929 930 if "nvmet_bin" in target_config: 931 self.nvmet_bin = target_config["nvmet_bin"] 932 933 def __del__(self): 934 nvmet_command(self.nvmet_bin, "clear") 935 936 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 937 938 nvmet_cfg = { 939 "ports": [], 940 "hosts": [], 941 "subsystems": [], 942 } 943 944 # Split disks between NIC IP's 945 disks_per_ip = int(len(nvme_list) / len(address_list)) 946 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 947 948 subsys_no = 1 949 port_no = 0 950 for ip, chunk in zip(address_list, disk_chunks): 951 for disk in chunk: 952 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 953 nvmet_cfg["subsystems"].append({ 954 "allowed_hosts": [], 955 "attr": { 956 "allow_any_host": "1", 957 "serial": "SPDK00%s" % subsys_no, 958 "version": "1.3" 959 }, 960 "namespaces": [ 961 { 962 "device": { 963 "path": disk, 964 "uuid": "%s" % uuid.uuid4() 965 }, 966 "enable": 1, 967 "nsid": subsys_no 968 } 969 ], 970 "nqn": nqn 971 }) 972 973 nvmet_cfg["ports"].append({ 974 "addr": { 975 "adrfam": "ipv4", 976 "traddr": ip, 977 "trsvcid": "%s" % (4420 + port_no), 978 "trtype": "%s" % self.transport 979 }, 980 "portid": subsys_no, 981 "referrals": [], 982 "subsystems": [nqn] 983 }) 984 subsys_no += 1 985 port_no += 1 986 self.subsystem_info_list.append([port_no, nqn, ip]) 987 988 with open("kernel.conf", "w") as fh: 989 fh.write(json.dumps(nvmet_cfg, indent=2)) 990 pass 991 992 def tgt_start(self): 993 self.log_print("Configuring kernel NVMeOF Target") 994 995 if self.null_block: 996 print("Configuring with null block device.") 997 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 998 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 999 self.subsys_no = len(null_blk_list) 1000 else: 1001 print("Configuring with NVMe drives.") 1002 nvme_list = get_nvme_devices() 1003 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1004 self.subsys_no = len(nvme_list) 1005 1006 nvmet_command(self.nvmet_bin, "clear") 1007 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1008 1009 if self.enable_adq: 1010 self.adq_configure_tc() 1011 1012 self.log_print("Done configuring kernel NVMeOF Target") 1013 1014 1015class SPDKTarget(Target): 1016 def __init__(self, name, general_config, target_config): 1017 super(SPDKTarget, self).__init__(name, general_config, target_config) 1018 1019 # Required fields 1020 self.core_mask = target_config["core_mask"] 1021 self.num_cores = self.get_num_cores(self.core_mask) 1022 1023 # Defaults 1024 self.dif_insert_strip = False 1025 self.null_block_dif_type = 0 1026 self.num_shared_buffers = 4096 1027 self.bpf_proc = None 1028 self.bpf_scripts = [] 1029 1030 if "num_shared_buffers" in target_config: 1031 self.num_shared_buffers = target_config["num_shared_buffers"] 1032 if "null_block_dif_type" in target_config: 1033 self.null_block_dif_type = target_config["null_block_dif_type"] 1034 if "dif_insert_strip" in target_config: 1035 self.dif_insert_strip = target_config["dif_insert_strip"] 1036 if "bpf_scripts" in target_config: 1037 self.bpf_scripts = target_config["bpf_scripts"] 1038 1039 def get_num_cores(self, core_mask): 1040 if "0x" in core_mask: 1041 return bin(int(core_mask, 16)).count("1") 1042 else: 1043 num_cores = 0 1044 core_mask = core_mask.replace("[", "") 1045 core_mask = core_mask.replace("]", "") 1046 for i in core_mask.split(","): 1047 if "-" in i: 1048 x, y = i.split("-") 1049 num_cores += len(range(int(x), int(y))) + 1 1050 else: 1051 num_cores += 1 1052 return num_cores 1053 1054 def spdk_tgt_configure(self): 1055 self.log_print("Configuring SPDK NVMeOF target via RPC") 1056 1057 # Create RDMA transport layer 1058 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1059 num_shared_buffers=self.num_shared_buffers, 1060 dif_insert_or_strip=self.dif_insert_strip, 1061 sock_priority=self.adq_priority) 1062 self.log_print("SPDK NVMeOF transport layer:") 1063 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1064 1065 if self.enable_adq: 1066 self.adq_configure_tc() 1067 self.log_print("Done configuring SPDK NVMeOF Target") 1068 1069 if self.null_block: 1070 self.spdk_tgt_add_nullblock(self.null_block) 1071 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1072 else: 1073 self.spdk_tgt_add_nvme_conf() 1074 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1075 1076 def spdk_tgt_add_nullblock(self, null_block_count): 1077 md_size = 0 1078 block_size = 4096 1079 if self.null_block_dif_type != 0: 1080 md_size = 128 1081 1082 self.log_print("Adding null block bdevices to config via RPC") 1083 for i in range(null_block_count): 1084 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1085 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1086 dif_type=self.null_block_dif_type, md_size=md_size) 1087 self.log_print("SPDK Bdevs configuration:") 1088 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1089 1090 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1091 self.log_print("Adding NVMe bdevs to config via RPC") 1092 1093 bdfs = get_nvme_devices_bdf() 1094 bdfs = [b.replace(":", ".") for b in bdfs] 1095 1096 if req_num_disks: 1097 if req_num_disks > len(bdfs): 1098 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1099 sys.exit(1) 1100 else: 1101 bdfs = bdfs[0:req_num_disks] 1102 1103 for i, bdf in enumerate(bdfs): 1104 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1105 1106 self.log_print("SPDK Bdevs configuration:") 1107 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1108 1109 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1110 self.log_print("Adding subsystems to config") 1111 port = "4420" 1112 if not req_num_disks: 1113 req_num_disks = get_nvme_devices_count() 1114 1115 # Distribute bdevs between provided NICs 1116 num_disks = range(0, req_num_disks) 1117 if len(num_disks) == 1: 1118 disks_per_ip = 1 1119 else: 1120 disks_per_ip = int(len(num_disks) / len(ips)) 1121 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1122 1123 # Create subsystems, add bdevs to namespaces, add listeners 1124 for ip, chunk in zip(ips, disk_chunks): 1125 for c in chunk: 1126 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1127 serial = "SPDK00%s" % c 1128 bdev_name = "Nvme%sn1" % c 1129 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1130 allow_any_host=True, max_namespaces=8) 1131 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1132 1133 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1134 nqn=nqn, 1135 trtype=self.transport, 1136 traddr=ip, 1137 trsvcid=port, 1138 adrfam="ipv4") 1139 1140 self.subsystem_info_list.append([port, nqn, ip]) 1141 self.log_print("SPDK NVMeOF subsystem configuration:") 1142 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1143 1144 def bpf_start(self): 1145 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1146 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1147 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1148 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1149 1150 with open(self.pid, "r") as fh: 1151 nvmf_pid = str(fh.readline()) 1152 1153 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1154 self.log_print(cmd) 1155 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1156 1157 def tgt_start(self): 1158 if self.null_block: 1159 self.subsys_no = 1 1160 else: 1161 self.subsys_no = get_nvme_devices_count() 1162 self.log_print("Starting SPDK NVMeOF Target process") 1163 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1164 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1165 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1166 1167 with open(self.pid, "w") as fh: 1168 fh.write(str(proc.pid)) 1169 self.nvmf_proc = proc 1170 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1171 self.log_print("Waiting for spdk to initilize...") 1172 while True: 1173 if os.path.exists("/var/tmp/spdk.sock"): 1174 break 1175 time.sleep(1) 1176 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1177 1178 if self.enable_zcopy: 1179 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1180 enable_zerocopy_send_server=True) 1181 self.log_print("Target socket options:") 1182 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1183 1184 if self.enable_adq: 1185 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1186 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1187 nvme_adminq_poll_period_us=100000, retry_count=4) 1188 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1189 1190 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1191 1192 rpc.framework_start_init(self.client) 1193 1194 if self.bpf_scripts: 1195 self.bpf_start() 1196 1197 self.spdk_tgt_configure() 1198 1199 def __del__(self): 1200 if self.bpf_proc: 1201 self.log_print("Stopping BPF Trace script") 1202 self.bpf_proc.terminate() 1203 self.bpf_proc.wait() 1204 1205 if hasattr(self, "nvmf_proc"): 1206 try: 1207 self.nvmf_proc.terminate() 1208 self.nvmf_proc.wait() 1209 except Exception as e: 1210 self.log_print(e) 1211 self.nvmf_proc.kill() 1212 self.nvmf_proc.communicate() 1213 1214 1215class KernelInitiator(Initiator): 1216 def __init__(self, name, general_config, initiator_config): 1217 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1218 1219 # Defaults 1220 self.extra_params = "" 1221 self.ioengine = "libaio" 1222 1223 if "extra_params" in initiator_config: 1224 self.extra_params = initiator_config["extra_params"] 1225 1226 if "kernel_engine" in initiator_config: 1227 self.ioengine = initiator_config["kernel_engine"] 1228 if "io_uring" in self.ioengine: 1229 self.extra_params = "--nr-poll-queues=8" 1230 1231 def __del__(self): 1232 self.ssh_connection.close() 1233 1234 def get_connected_nvme_list(self): 1235 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1236 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1237 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1238 return nvme_list 1239 1240 def kernel_init_connect(self): 1241 self.log_print("Below connection attempts may result in error messages, this is expected!") 1242 for subsystem in self.subsystem_info_list: 1243 self.log_print("Trying to connect %s %s %s" % subsystem) 1244 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1245 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1246 time.sleep(2) 1247 1248 if "io_uring" in self.ioengine: 1249 self.log_print("Setting block layer settings for io_uring.") 1250 1251 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1252 # apparently it's not possible for connected subsystems. 1253 # Results in "error: Invalid argument" 1254 block_sysfs_settings = { 1255 "iostats": "0", 1256 "rq_affinity": "0", 1257 "nomerges": "2" 1258 } 1259 1260 for disk in self.get_connected_nvme_list(): 1261 sysfs = os.path.join("/sys/block", disk, "queue") 1262 for k, v in block_sysfs_settings.items(): 1263 sysfs_opt_path = os.path.join(sysfs, k) 1264 try: 1265 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1266 except subprocess.CalledProcessError as e: 1267 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1268 finally: 1269 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1270 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1271 1272 def kernel_init_disconnect(self): 1273 for subsystem in self.subsystem_info_list: 1274 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1275 time.sleep(1) 1276 1277 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1278 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1279 1280 filename_section = "" 1281 nvme_per_split = int(len(nvme_list) / len(threads)) 1282 remainder = len(nvme_list) % len(threads) 1283 iterator = iter(nvme_list) 1284 result = [] 1285 for i in range(len(threads)): 1286 result.append([]) 1287 for _ in range(nvme_per_split): 1288 result[i].append(next(iterator)) 1289 if remainder: 1290 result[i].append(next(iterator)) 1291 remainder -= 1 1292 for i, r in enumerate(result): 1293 header = "[filename%s]" % i 1294 disks = "\n".join(["filename=%s" % x for x in r]) 1295 job_section_qd = round((io_depth * len(r)) / num_jobs) 1296 if job_section_qd == 0: 1297 job_section_qd = 1 1298 iodepth = "iodepth=%s" % job_section_qd 1299 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1300 1301 return filename_section 1302 1303 1304class SPDKInitiator(Initiator): 1305 def __init__(self, name, general_config, initiator_config): 1306 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1307 1308 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1309 self.install_spdk() 1310 1311 # Required fields 1312 self.num_cores = initiator_config["num_cores"] 1313 1314 def install_spdk(self): 1315 self.log_print("Using fio binary %s" % self.fio_bin) 1316 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1317 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1318 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1319 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1320 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1321 1322 self.log_print("SPDK built") 1323 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1324 1325 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1326 bdev_cfg_section = { 1327 "subsystems": [ 1328 { 1329 "subsystem": "bdev", 1330 "config": [] 1331 } 1332 ] 1333 } 1334 1335 for i, subsys in enumerate(remote_subsystem_list): 1336 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1337 nvme_ctrl = { 1338 "method": "bdev_nvme_attach_controller", 1339 "params": { 1340 "name": "Nvme{}".format(i), 1341 "trtype": self.transport, 1342 "traddr": sub_addr, 1343 "trsvcid": sub_port, 1344 "subnqn": sub_nqn, 1345 "adrfam": "IPv4" 1346 } 1347 } 1348 1349 if self.enable_adq: 1350 nvme_ctrl["params"].update({"priority": "1"}) 1351 1352 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1353 1354 return json.dumps(bdev_cfg_section, indent=2) 1355 1356 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1357 filename_section = "" 1358 if len(threads) >= len(subsystems): 1359 threads = range(0, len(subsystems)) 1360 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1361 nvme_per_split = int(len(subsystems) / len(threads)) 1362 remainder = len(subsystems) % len(threads) 1363 iterator = iter(filenames) 1364 result = [] 1365 for i in range(len(threads)): 1366 result.append([]) 1367 for _ in range(nvme_per_split): 1368 result[i].append(next(iterator)) 1369 if remainder: 1370 result[i].append(next(iterator)) 1371 remainder -= 1 1372 for i, r in enumerate(result): 1373 header = "[filename%s]" % i 1374 disks = "\n".join(["filename=%s" % x for x in r]) 1375 job_section_qd = round((io_depth * len(r)) / num_jobs) 1376 if job_section_qd == 0: 1377 job_section_qd = 1 1378 iodepth = "iodepth=%s" % job_section_qd 1379 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1380 1381 return filename_section 1382 1383 1384if __name__ == "__main__": 1385 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1386 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1387 1388 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1389 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1390 help='Configuration file.') 1391 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1392 help='Results directory.') 1393 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1394 help='CSV results filename.') 1395 1396 args = parser.parse_args() 1397 1398 print("Using config file: %s" % args.config) 1399 with open(args.config, "r") as config: 1400 data = json.load(config) 1401 1402 initiators = [] 1403 fio_cases = [] 1404 1405 general_config = data["general"] 1406 target_config = data["target"] 1407 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1408 1409 for k, v in data.items(): 1410 if "target" in k: 1411 v.update({"results_dir": args.results}) 1412 if data[k]["mode"] == "spdk": 1413 target_obj = SPDKTarget(k, data["general"], v) 1414 elif data[k]["mode"] == "kernel": 1415 target_obj = KernelTarget(k, data["general"], v) 1416 pass 1417 elif "initiator" in k: 1418 if data[k]["mode"] == "spdk": 1419 init_obj = SPDKInitiator(k, data["general"], v) 1420 elif data[k]["mode"] == "kernel": 1421 init_obj = KernelInitiator(k, data["general"], v) 1422 initiators.append(init_obj) 1423 elif "fio" in k: 1424 fio_workloads = itertools.product(data[k]["bs"], 1425 data[k]["qd"], 1426 data[k]["rw"]) 1427 1428 fio_run_time = data[k]["run_time"] 1429 fio_ramp_time = data[k]["ramp_time"] 1430 fio_rw_mix_read = data[k]["rwmixread"] 1431 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1432 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1433 1434 fio_rate_iops = 0 1435 if "rate_iops" in data[k]: 1436 fio_rate_iops = data[k]["rate_iops"] 1437 else: 1438 continue 1439 1440 try: 1441 os.mkdir(args.results) 1442 except FileExistsError: 1443 pass 1444 1445 target_obj.tgt_start() 1446 1447 for i in initiators: 1448 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1449 if i.enable_adq: 1450 i.adq_configure_tc() 1451 1452 # Poor mans threading 1453 # Run FIO tests 1454 for block_size, io_depth, rw in fio_workloads: 1455 threads = [] 1456 configs = [] 1457 for i in initiators: 1458 if i.mode == "kernel": 1459 i.kernel_init_connect() 1460 1461 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1462 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1463 configs.append(cfg) 1464 1465 for i, cfg in zip(initiators, configs): 1466 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1467 threads.append(t) 1468 if target_obj.enable_sar: 1469 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1470 sar_file_name = ".".join([sar_file_name, "txt"]) 1471 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1472 threads.append(t) 1473 1474 if target_obj.enable_pcm: 1475 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1476 1477 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1478 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1479 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1480 1481 threads.append(pcm_cpu_t) 1482 threads.append(pcm_mem_t) 1483 threads.append(pcm_pow_t) 1484 1485 if target_obj.enable_bandwidth: 1486 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1487 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1488 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1489 threads.append(t) 1490 1491 if target_obj.enable_dpdk_memory: 1492 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1493 threads.append(t) 1494 1495 for t in threads: 1496 t.start() 1497 for t in threads: 1498 t.join() 1499 1500 for i in initiators: 1501 if i.mode == "kernel": 1502 i.kernel_init_disconnect() 1503 i.copy_result_files(args.results) 1504 1505 target_obj.restore_governor() 1506 target_obj.restore_tuned() 1507 target_obj.restore_services() 1508 target_obj.restore_sysctl() 1509 for i in initiators: 1510 i.restore_governor() 1511 i.restore_tuned() 1512 i.restore_services() 1513 i.restore_sysctl() 1514 target_obj.parse_results(args.results, args.csv_filename) 1515