1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20 21import rpc 22import rpc.client 23from common import * 24 25 26class Server: 27 def __init__(self, name, general_config, server_config): 28 self.name = name 29 self.username = general_config["username"] 30 self.password = general_config["password"] 31 self.transport = general_config["transport"].lower() 32 self.nic_ips = server_config["nic_ips"] 33 self.mode = server_config["mode"] 34 35 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 36 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 37 self.irq_scripts_dir = server_config["irq_scripts_dir"] 38 39 self.local_nic_info = [] 40 self._nics_json_obj = {} 41 self.svc_restore_dict = {} 42 self.sysctl_restore_dict = {} 43 self.tuned_restore_dict = {} 44 self.governor_restore = "" 45 self.tuned_profile = "" 46 47 self.enable_adq = False 48 self.adq_priority = None 49 if "adq_enable" in server_config and server_config["adq_enable"]: 50 self.enable_adq = server_config["adq_enable"] 51 self.adq_priority = 1 52 53 if "tuned_profile" in server_config: 54 self.tuned_profile = server_config["tuned_profile"] 55 56 if not re.match("^[A-Za-z0-9]*$", name): 57 self.log_print("Please use a name which contains only letters or numbers") 58 sys.exit(1) 59 60 def log_print(self, msg): 61 print("[%s] %s" % (self.name, msg), flush=True) 62 63 def get_uncommented_lines(self, lines): 64 return [line for line in lines if line and not line.startswith('#')] 65 66 def get_nic_name_by_ip(self, ip): 67 if not self._nics_json_obj: 68 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 69 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 70 for nic in self._nics_json_obj: 71 for addr in nic["addr_info"]: 72 if ip in addr["local"]: 73 return nic["ifname"] 74 75 def set_local_nic_info_helper(self): 76 pass 77 78 def set_local_nic_info(self, pci_info): 79 def extract_network_elements(json_obj): 80 nic_list = [] 81 if isinstance(json_obj, list): 82 for x in json_obj: 83 nic_list.extend(extract_network_elements(x)) 84 elif isinstance(json_obj, dict): 85 if "children" in json_obj: 86 nic_list.extend(extract_network_elements(json_obj["children"])) 87 if "class" in json_obj.keys() and "network" in json_obj["class"]: 88 nic_list.append(json_obj) 89 return nic_list 90 91 self.local_nic_info = extract_network_elements(pci_info) 92 93 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 94 return "" 95 96 def configure_system(self): 97 self.configure_services() 98 self.configure_sysctl() 99 self.configure_tuned() 100 self.configure_cpu_governor() 101 self.configure_irq_affinity() 102 103 def configure_adq(self): 104 if self.mode == "kernel": 105 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 106 return 107 self.adq_load_modules() 108 self.adq_configure_nic() 109 110 def adq_load_modules(self): 111 self.log_print("Modprobing ADQ-related Linux modules...") 112 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 113 for module in adq_module_deps: 114 try: 115 self.exec_cmd(["sudo", "modprobe", module]) 116 self.log_print("%s loaded!" % module) 117 except CalledProcessError as e: 118 self.log_print("ERROR: failed to load module %s" % module) 119 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 120 121 def adq_configure_tc(self): 122 self.log_print("Configuring ADQ Traffic classess and filters...") 123 124 if self.mode == "kernel": 125 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 126 return 127 128 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 129 num_queues_tc1 = self.num_cores 130 port_param = "dst_port" if isinstance(self, Target) else "src_port" 131 ports = set([p[0] for p in self.subsystem_info_list]) 132 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 133 134 for nic_ip in self.nic_ips: 135 nic_name = self.get_nic_name_by_ip(nic_ip) 136 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 137 "root", "mqprio", "num_tc", "2", "map", "0", "1", 138 "queues", "%s@0" % num_queues_tc0, 139 "%s@%s" % (num_queues_tc1, num_queues_tc0), 140 "hw", "1", "mode", "channel"] 141 self.log_print(" ".join(tc_qdisc_map_cmd)) 142 self.exec_cmd(tc_qdisc_map_cmd) 143 144 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 145 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 146 self.exec_cmd(tc_qdisc_ingress_cmd) 147 148 for port in ports: 149 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 150 "protocol", "ip", "ingress", "prio", "1", "flower", 151 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 152 "skip_sw", "hw_tc", "1"] 153 self.log_print(" ".join(tc_filter_cmd)) 154 self.exec_cmd(tc_filter_cmd) 155 156 # Ethtool coalese settings must be applied after configuring traffic classes 157 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 158 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 159 160 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 161 xps_cmd = ["sudo", xps_script_path, nic_name] 162 self.log_print(xps_cmd) 163 self.exec_cmd(xps_cmd) 164 165 def adq_configure_nic(self): 166 self.log_print("Configuring NIC port settings for ADQ testing...") 167 168 # Reload the driver first, to make sure any previous settings are re-set. 169 try: 170 self.exec_cmd(["sudo", "rmmod", "ice"]) 171 self.exec_cmd(["sudo", "modprobe", "ice"]) 172 except CalledProcessError as e: 173 self.log_print("ERROR: failed to reload ice module!") 174 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 175 176 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 177 for nic in nic_names: 178 self.log_print(nic) 179 try: 180 self.exec_cmd(["sudo", "ethtool", "-K", nic, 181 "hw-tc-offload", "on"]) # Enable hardware TC offload 182 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 183 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 184 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 185 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 186 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 187 except CalledProcessError as e: 188 self.log_print("ERROR: failed to configure NIC port using ethtool!") 189 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 190 self.log_print("Please update your NIC driver and firmware versions and try again.") 191 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 192 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 193 194 def configure_services(self): 195 self.log_print("Configuring active services...") 196 svc_config = configparser.ConfigParser(strict=False) 197 198 # Below list is valid only for RHEL / Fedora systems and might not 199 # contain valid names for other distributions. 200 svc_target_state = { 201 "firewalld": "inactive", 202 "irqbalance": "inactive", 203 "lldpad.service": "inactive", 204 "lldpad.socket": "inactive" 205 } 206 207 for service in svc_target_state: 208 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 209 out = "\n".join(["[%s]" % service, out]) 210 svc_config.read_string(out) 211 212 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 213 continue 214 215 service_state = svc_config[service]["ActiveState"] 216 self.log_print("Current state of %s service is %s" % (service, service_state)) 217 self.svc_restore_dict.update({service: service_state}) 218 if service_state != "inactive": 219 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 220 self.exec_cmd(["sudo", "systemctl", "stop", service]) 221 222 def configure_sysctl(self): 223 self.log_print("Tuning sysctl settings...") 224 225 busy_read = 0 226 if self.enable_adq and self.mode == "spdk": 227 busy_read = 1 228 229 sysctl_opts = { 230 "net.core.busy_poll": 0, 231 "net.core.busy_read": busy_read, 232 "net.core.somaxconn": 4096, 233 "net.core.netdev_max_backlog": 8192, 234 "net.ipv4.tcp_max_syn_backlog": 16384, 235 "net.core.rmem_max": 268435456, 236 "net.core.wmem_max": 268435456, 237 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 238 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 239 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 240 "net.ipv4.route.flush": 1, 241 "vm.overcommit_memory": 1, 242 } 243 244 for opt, value in sysctl_opts.items(): 245 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 246 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 247 248 def configure_tuned(self): 249 if not self.tuned_profile: 250 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 251 return 252 253 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 254 service = "tuned" 255 tuned_config = configparser.ConfigParser(strict=False) 256 257 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 258 out = "\n".join(["[%s]" % service, out]) 259 tuned_config.read_string(out) 260 tuned_state = tuned_config[service]["ActiveState"] 261 self.svc_restore_dict.update({service: tuned_state}) 262 263 if tuned_state != "inactive": 264 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 265 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 266 267 self.tuned_restore_dict = { 268 "profile": profile, 269 "mode": profile_mode 270 } 271 272 self.exec_cmd(["sudo", "systemctl", "start", service]) 273 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 274 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 275 276 def configure_cpu_governor(self): 277 self.log_print("Setting CPU governor to performance...") 278 279 # This assumes that there is the same CPU scaling governor on each CPU 280 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 281 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 282 283 def configure_irq_affinity(self): 284 self.log_print("Setting NIC irq affinity for NICs...") 285 286 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 287 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 288 for nic in nic_names: 289 irq_cmd = ["sudo", irq_script_path, nic] 290 self.log_print(irq_cmd) 291 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 292 293 def restore_services(self): 294 self.log_print("Restoring services...") 295 for service, state in self.svc_restore_dict.items(): 296 cmd = "stop" if state == "inactive" else "start" 297 self.exec_cmd(["sudo", "systemctl", cmd, service]) 298 299 def restore_sysctl(self): 300 self.log_print("Restoring sysctl settings...") 301 for opt, value in self.sysctl_restore_dict.items(): 302 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 303 304 def restore_tuned(self): 305 self.log_print("Restoring tuned-adm settings...") 306 307 if not self.tuned_restore_dict: 308 return 309 310 if self.tuned_restore_dict["mode"] == "auto": 311 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 312 self.log_print("Reverted tuned-adm to auto_profile.") 313 else: 314 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 315 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 316 317 def restore_governor(self): 318 self.log_print("Restoring CPU governor setting...") 319 if self.governor_restore: 320 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 321 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 322 323 324class Target(Server): 325 def __init__(self, name, general_config, target_config): 326 super(Target, self).__init__(name, general_config, target_config) 327 328 # Defaults 329 self.enable_sar = False 330 self.sar_delay = 0 331 self.sar_interval = 0 332 self.sar_count = 0 333 self.enable_pcm = False 334 self.pcm_dir = "" 335 self.pcm_delay = 0 336 self.pcm_interval = 0 337 self.pcm_count = 0 338 self.enable_bandwidth = 0 339 self.bandwidth_count = 0 340 self.enable_dpdk_memory = False 341 self.dpdk_wait_time = 0 342 self.enable_zcopy = False 343 self.scheduler_name = "static" 344 self.null_block = 0 345 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 346 self.subsystem_info_list = [] 347 348 if "null_block_devices" in target_config: 349 self.null_block = target_config["null_block_devices"] 350 if "sar_settings" in target_config: 351 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 352 if "pcm_settings" in target_config: 353 self.enable_pcm = True 354 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 355 if "enable_bandwidth" in target_config: 356 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 357 if "enable_dpdk_memory" in target_config: 358 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 359 if "scheduler_settings" in target_config: 360 self.scheduler_name = target_config["scheduler_settings"] 361 if "zcopy_settings" in target_config: 362 self.enable_zcopy = target_config["zcopy_settings"] 363 if "results_dir" in target_config: 364 self.results_dir = target_config["results_dir"] 365 366 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 367 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 368 self.set_local_nic_info(self.set_local_nic_info_helper()) 369 370 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 371 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 372 373 self.configure_system() 374 if self.enable_adq: 375 self.configure_adq() 376 self.sys_config() 377 378 def set_local_nic_info_helper(self): 379 return json.loads(self.exec_cmd(["lshw", "-json"])) 380 381 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 382 stderr_opt = None 383 if stderr_redirect: 384 stderr_opt = subprocess.STDOUT 385 if change_dir: 386 old_cwd = os.getcwd() 387 os.chdir(change_dir) 388 self.log_print("Changing directory to %s" % change_dir) 389 390 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 391 392 if change_dir: 393 os.chdir(old_cwd) 394 self.log_print("Changing directory to %s" % old_cwd) 395 return out 396 397 def zip_spdk_sources(self, spdk_dir, dest_file): 398 self.log_print("Zipping SPDK source directory") 399 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 400 for root, directories, files in os.walk(spdk_dir, followlinks=True): 401 for file in files: 402 fh.write(os.path.relpath(os.path.join(root, file))) 403 fh.close() 404 self.log_print("Done zipping") 405 406 def read_json_stats(self, file): 407 with open(file, "r") as json_data: 408 data = json.load(json_data) 409 job_pos = 0 # job_post = 0 because using aggregated results 410 411 # Check if latency is in nano or microseconds to choose correct dict key 412 def get_lat_unit(key_prefix, dict_section): 413 # key prefix - lat, clat or slat. 414 # dict section - portion of json containing latency bucket in question 415 # Return dict key to access the bucket and unit as string 416 for k, _ in dict_section.items(): 417 if k.startswith(key_prefix): 418 return k, k.split("_")[1] 419 420 def get_clat_percentiles(clat_dict_leaf): 421 if "percentile" in clat_dict_leaf: 422 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 423 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 424 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 425 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 426 427 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 428 else: 429 # Latest fio versions do not provide "percentile" results if no 430 # measurements were done, so just return zeroes 431 return [0, 0, 0, 0] 432 433 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 434 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 435 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 436 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 437 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 438 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 439 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 440 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 441 data["jobs"][job_pos]["read"][clat_key]) 442 443 if "ns" in lat_unit: 444 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 445 if "ns" in clat_unit: 446 read_p99_lat = read_p99_lat / 1000 447 read_p99_9_lat = read_p99_9_lat / 1000 448 read_p99_99_lat = read_p99_99_lat / 1000 449 read_p99_999_lat = read_p99_999_lat / 1000 450 451 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 452 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 453 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 454 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 455 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 456 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 457 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 458 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 459 data["jobs"][job_pos]["write"][clat_key]) 460 461 if "ns" in lat_unit: 462 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 463 if "ns" in clat_unit: 464 write_p99_lat = write_p99_lat / 1000 465 write_p99_9_lat = write_p99_9_lat / 1000 466 write_p99_99_lat = write_p99_99_lat / 1000 467 write_p99_999_lat = write_p99_999_lat / 1000 468 469 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 470 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 471 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 472 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 473 474 def parse_results(self, results_dir, csv_file): 475 files = os.listdir(results_dir) 476 fio_files = filter(lambda x: ".fio" in x, files) 477 json_files = [x for x in files if ".json" in x] 478 479 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 480 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 481 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 482 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 483 484 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 485 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 486 487 header_line = ",".join(["Name", *headers]) 488 aggr_header_line = ",".join(["Name", *aggr_headers]) 489 490 # Create empty results file 491 with open(os.path.join(results_dir, csv_file), "w") as fh: 492 fh.write(aggr_header_line + "\n") 493 rows = set() 494 495 for fio_config in fio_files: 496 self.log_print("Getting FIO stats for %s" % fio_config) 497 job_name, _ = os.path.splitext(fio_config) 498 499 # Look in the filename for rwmixread value. Function arguments do 500 # not have that information. 501 # TODO: Improve this function by directly using workload params instead 502 # of regexing through filenames. 503 if "read" in job_name: 504 rw_mixread = 1 505 elif "write" in job_name: 506 rw_mixread = 0 507 else: 508 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 509 510 # If "_CPU" exists in name - ignore it 511 # Initiators for the same job could have diffrent num_cores parameter 512 job_name = re.sub(r"_\d+CPU", "", job_name) 513 job_result_files = [x for x in json_files if job_name in x] 514 self.log_print("Matching result files for current fio config:") 515 for j in job_result_files: 516 self.log_print("\t %s" % j) 517 518 # There may have been more than 1 initiator used in test, need to check that 519 # Result files are created so that string after last "_" separator is server name 520 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 521 inits_avg_results = [] 522 for i in inits_names: 523 self.log_print("\tGetting stats for initiator %s" % i) 524 # There may have been more than 1 test run for this job, calculate average results for initiator 525 i_results = [x for x in job_result_files if i in x] 526 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 527 528 separate_stats = [] 529 for r in i_results: 530 try: 531 stats = self.read_json_stats(os.path.join(results_dir, r)) 532 separate_stats.append(stats) 533 self.log_print(stats) 534 except JSONDecodeError as e: 535 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!") 536 537 init_results = [sum(x) for x in zip(*separate_stats)] 538 init_results = [x / len(separate_stats) for x in init_results] 539 inits_avg_results.append(init_results) 540 541 self.log_print("\tAverage results for initiator %s" % i) 542 self.log_print(init_results) 543 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 544 fh.write(header_line + "\n") 545 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 546 547 # Sum results of all initiators running this FIO job. 548 # Latency results are an average of latencies from accros all initiators. 549 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 550 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 551 for key in inits_avg_results: 552 if "lat" in key: 553 inits_avg_results[key] /= len(inits_names) 554 555 # Aggregate separate read/write values into common labels 556 # Take rw_mixread into consideration for mixed read/write workloads. 557 aggregate_results = OrderedDict() 558 for h in aggr_headers: 559 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 560 if "lat" in h: 561 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 562 else: 563 _ = read_stat + write_stat 564 aggregate_results[h] = "{0:.3f}".format(_) 565 566 rows.add(",".join([job_name, *aggregate_results.values()])) 567 568 # Save results to file 569 for row in rows: 570 with open(os.path.join(results_dir, csv_file), "a") as fh: 571 fh.write(row + "\n") 572 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 573 574 def measure_sar(self, results_dir, sar_file_name): 575 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 576 time.sleep(self.sar_delay) 577 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 578 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 579 for line in out.split("\n"): 580 if "Average" in line and "CPU" in line: 581 self.log_print("Summary CPU utilization from SAR:") 582 self.log_print(line) 583 if "Average" in line and "all" in line: 584 self.log_print(line) 585 fh.write(out) 586 587 def measure_pcm_memory(self, results_dir, pcm_file_name): 588 time.sleep(self.pcm_delay) 589 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 590 pcm_memory = subprocess.Popen(cmd) 591 time.sleep(self.pcm_count) 592 pcm_memory.terminate() 593 594 def measure_pcm(self, results_dir, pcm_file_name): 595 time.sleep(self.pcm_delay) 596 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 597 subprocess.run(cmd) 598 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 599 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 600 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 601 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 602 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 603 604 def measure_pcm_power(self, results_dir, pcm_power_file_name): 605 time.sleep(self.pcm_delay) 606 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 607 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 608 fh.write(out) 609 610 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 611 self.log_print("INFO: starting network bandwidth measure") 612 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 613 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 614 615 def measure_dpdk_memory(self, results_dir): 616 self.log_print("INFO: waiting to generate DPDK memory usage") 617 time.sleep(self.dpdk_wait_time) 618 self.log_print("INFO: generating DPDK memory usage") 619 rpc.env.env_dpdk_get_mem_stats 620 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 621 622 def sys_config(self): 623 self.log_print("====Kernel release:====") 624 self.log_print(os.uname().release) 625 self.log_print("====Kernel command line:====") 626 with open('/proc/cmdline') as f: 627 cmdline = f.readlines() 628 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 629 self.log_print("====sysctl conf:====") 630 with open('/etc/sysctl.conf') as f: 631 sysctl = f.readlines() 632 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 633 self.log_print("====Cpu power info:====") 634 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 635 self.log_print("====zcopy settings:====") 636 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 637 self.log_print("====Scheduler settings:====") 638 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 639 640 641class Initiator(Server): 642 def __init__(self, name, general_config, initiator_config): 643 super(Initiator, self).__init__(name, general_config, initiator_config) 644 645 # Required fields 646 self.ip = initiator_config["ip"] 647 self.target_nic_ips = initiator_config["target_nic_ips"] 648 649 # Defaults 650 self.cpus_allowed = None 651 self.cpus_allowed_policy = "shared" 652 self.spdk_dir = "/tmp/spdk" 653 self.fio_bin = "/usr/src/fio/fio" 654 self.nvmecli_bin = "nvme" 655 self.cpu_frequency = None 656 self.subsystem_info_list = [] 657 658 if "spdk_dir" in initiator_config: 659 self.spdk_dir = initiator_config["spdk_dir"] 660 if "fio_bin" in initiator_config: 661 self.fio_bin = initiator_config["fio_bin"] 662 if "nvmecli_bin" in initiator_config: 663 self.nvmecli_bin = initiator_config["nvmecli_bin"] 664 if "cpus_allowed" in initiator_config: 665 self.cpus_allowed = initiator_config["cpus_allowed"] 666 if "cpus_allowed_policy" in initiator_config: 667 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 668 if "cpu_frequency" in initiator_config: 669 self.cpu_frequency = initiator_config["cpu_frequency"] 670 if os.getenv('SPDK_WORKSPACE'): 671 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 672 673 self.ssh_connection = paramiko.SSHClient() 674 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 675 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 676 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 677 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 678 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 679 680 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 681 self.copy_spdk("/tmp/spdk.zip") 682 self.set_local_nic_info(self.set_local_nic_info_helper()) 683 self.set_cpu_frequency() 684 self.configure_system() 685 if self.enable_adq: 686 self.configure_adq() 687 self.sys_config() 688 689 def set_local_nic_info_helper(self): 690 return json.loads(self.exec_cmd(["lshw", "-json"])) 691 692 def __del__(self): 693 self.ssh_connection.close() 694 695 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 696 if change_dir: 697 cmd = ["cd", change_dir, ";", *cmd] 698 699 # In case one of the command elements contains whitespace and is not 700 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 701 # when sending to remote system. 702 for i, c in enumerate(cmd): 703 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 704 cmd[i] = '"%s"' % c 705 cmd = " ".join(cmd) 706 707 # Redirect stderr to stdout thanks using get_pty option if needed 708 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 709 out = stdout.read().decode(encoding="utf-8") 710 711 # Check the return code 712 rc = stdout.channel.recv_exit_status() 713 if rc: 714 raise CalledProcessError(int(rc), cmd, out) 715 716 return out 717 718 def put_file(self, local, remote_dest): 719 ftp = self.ssh_connection.open_sftp() 720 ftp.put(local, remote_dest) 721 ftp.close() 722 723 def get_file(self, remote, local_dest): 724 ftp = self.ssh_connection.open_sftp() 725 ftp.get(remote, local_dest) 726 ftp.close() 727 728 def copy_spdk(self, local_spdk_zip): 729 self.log_print("Copying SPDK sources to initiator %s" % self.name) 730 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 731 self.log_print("Copied sources zip from target") 732 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 733 self.log_print("Sources unpacked") 734 735 def copy_result_files(self, dest_dir): 736 self.log_print("Copying results") 737 738 if not os.path.exists(dest_dir): 739 os.mkdir(dest_dir) 740 741 # Get list of result files from initiator and copy them back to target 742 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 743 744 for file in file_list: 745 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 746 os.path.join(dest_dir, file)) 747 self.log_print("Done copying results") 748 749 def discover_subsystems(self, address_list, subsys_no): 750 num_nvmes = range(0, subsys_no) 751 nvme_discover_output = "" 752 for ip, subsys_no in itertools.product(address_list, num_nvmes): 753 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 754 nvme_discover_cmd = ["sudo", 755 "%s" % self.nvmecli_bin, 756 "discover", "-t", "%s" % self.transport, 757 "-s", "%s" % (4420 + subsys_no), 758 "-a", "%s" % ip] 759 760 try: 761 stdout = self.exec_cmd(nvme_discover_cmd) 762 if stdout: 763 nvme_discover_output = nvme_discover_output + stdout 764 except CalledProcessError: 765 # Do nothing. In case of discovering remote subsystems of kernel target 766 # we expect "nvme discover" to fail a bunch of times because we basically 767 # scan ports. 768 pass 769 770 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 771 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 772 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 773 nvme_discover_output) # from nvme discovery output 774 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 775 subsystems = list(set(subsystems)) 776 subsystems.sort(key=lambda x: x[1]) 777 self.log_print("Found matching subsystems on target side:") 778 for s in subsystems: 779 self.log_print(s) 780 self.subsystem_info_list = subsystems 781 782 def gen_fio_filename_conf(self, *args, **kwargs): 783 # Logic implemented in SPDKInitiator and KernelInitiator classes 784 pass 785 786 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 787 fio_conf_template = """ 788[global] 789ioengine={ioengine} 790{spdk_conf} 791thread=1 792group_reporting=1 793direct=1 794percentile_list=50:90:99:99.5:99.9:99.99:99.999 795 796norandommap=1 797rw={rw} 798rwmixread={rwmixread} 799bs={block_size} 800time_based=1 801ramp_time={ramp_time} 802runtime={run_time} 803rate_iops={rate_iops} 804""" 805 if "spdk" in self.mode: 806 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 807 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 808 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 809 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 810 else: 811 ioengine = self.ioengine 812 spdk_conf = "" 813 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 814 "|", "awk", "'{print $1}'"]) 815 subsystems = [x for x in out.split("\n") if "nvme" in x] 816 817 if self.cpus_allowed is not None: 818 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 819 cpus_num = 0 820 cpus = self.cpus_allowed.split(",") 821 for cpu in cpus: 822 if "-" in cpu: 823 a, b = cpu.split("-") 824 a = int(a) 825 b = int(b) 826 cpus_num += len(range(a, b)) 827 else: 828 cpus_num += 1 829 self.num_cores = cpus_num 830 threads = range(0, self.num_cores) 831 elif hasattr(self, 'num_cores'): 832 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 833 threads = range(0, int(self.num_cores)) 834 else: 835 self.num_cores = len(subsystems) 836 threads = range(0, len(subsystems)) 837 838 if "spdk" in self.mode: 839 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 840 else: 841 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 842 843 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 844 rw=rw, rwmixread=rwmixread, block_size=block_size, 845 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 846 847 # TODO: hipri disabled for now, as it causes fio errors: 848 # io_u error on file /dev/nvme2n1: Operation not supported 849 # See comment in KernelInitiator class, kernel_init_connect() function 850 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 851 fio_config = fio_config + """ 852fixedbufs=1 853registerfiles=1 854#hipri=1 855""" 856 if num_jobs: 857 fio_config = fio_config + "numjobs=%s \n" % num_jobs 858 if self.cpus_allowed is not None: 859 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 860 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 861 fio_config = fio_config + filename_section 862 863 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 864 if hasattr(self, "num_cores"): 865 fio_config_filename += "_%sCPU" % self.num_cores 866 fio_config_filename += ".fio" 867 868 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 869 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 870 self.log_print("Created FIO Config:") 871 self.log_print(fio_config) 872 873 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 874 875 def set_cpu_frequency(self): 876 if self.cpu_frequency is not None: 877 try: 878 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 879 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 880 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 881 except Exception: 882 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 883 sys.exit() 884 else: 885 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 886 887 def run_fio(self, fio_config_file, run_num=None): 888 job_name, _ = os.path.splitext(fio_config_file) 889 self.log_print("Starting FIO run for job: %s" % job_name) 890 self.log_print("Using FIO: %s" % self.fio_bin) 891 892 if run_num: 893 for i in range(1, run_num + 1): 894 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 895 try: 896 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 897 "--output=%s" % output_filename, "--eta=never"], True) 898 self.log_print(output) 899 except subprocess.CalledProcessError as e: 900 self.log_print("ERROR: Fio process failed!") 901 self.log_print(e.stdout) 902 else: 903 output_filename = job_name + "_" + self.name + ".json" 904 output = self.exec_cmd(["sudo", self.fio_bin, 905 fio_config_file, "--output-format=json", 906 "--output" % output_filename], True) 907 self.log_print(output) 908 self.log_print("FIO run finished. Results in: %s" % output_filename) 909 910 def sys_config(self): 911 self.log_print("====Kernel release:====") 912 self.log_print(self.exec_cmd(["uname", "-r"])) 913 self.log_print("====Kernel command line:====") 914 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 915 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 916 self.log_print("====sysctl conf:====") 917 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 918 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 919 self.log_print("====Cpu power info:====") 920 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 921 922 923class KernelTarget(Target): 924 def __init__(self, name, general_config, target_config): 925 super(KernelTarget, self).__init__(name, general_config, target_config) 926 # Defaults 927 self.nvmet_bin = "nvmetcli" 928 929 if "nvmet_bin" in target_config: 930 self.nvmet_bin = target_config["nvmet_bin"] 931 932 def __del__(self): 933 nvmet_command(self.nvmet_bin, "clear") 934 935 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 936 937 nvmet_cfg = { 938 "ports": [], 939 "hosts": [], 940 "subsystems": [], 941 } 942 943 # Split disks between NIC IP's 944 disks_per_ip = int(len(nvme_list) / len(address_list)) 945 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 946 947 subsys_no = 1 948 port_no = 0 949 for ip, chunk in zip(address_list, disk_chunks): 950 for disk in chunk: 951 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 952 nvmet_cfg["subsystems"].append({ 953 "allowed_hosts": [], 954 "attr": { 955 "allow_any_host": "1", 956 "serial": "SPDK00%s" % subsys_no, 957 "version": "1.3" 958 }, 959 "namespaces": [ 960 { 961 "device": { 962 "path": disk, 963 "uuid": "%s" % uuid.uuid4() 964 }, 965 "enable": 1, 966 "nsid": subsys_no 967 } 968 ], 969 "nqn": nqn 970 }) 971 972 nvmet_cfg["ports"].append({ 973 "addr": { 974 "adrfam": "ipv4", 975 "traddr": ip, 976 "trsvcid": "%s" % (4420 + port_no), 977 "trtype": "%s" % self.transport 978 }, 979 "portid": subsys_no, 980 "referrals": [], 981 "subsystems": [nqn] 982 }) 983 subsys_no += 1 984 port_no += 1 985 self.subsystem_info_list.append([port_no, nqn, ip]) 986 987 with open("kernel.conf", "w") as fh: 988 fh.write(json.dumps(nvmet_cfg, indent=2)) 989 pass 990 991 def tgt_start(self): 992 self.log_print("Configuring kernel NVMeOF Target") 993 994 if self.null_block: 995 print("Configuring with null block device.") 996 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 997 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 998 self.subsys_no = len(null_blk_list) 999 else: 1000 print("Configuring with NVMe drives.") 1001 nvme_list = get_nvme_devices() 1002 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1003 self.subsys_no = len(nvme_list) 1004 1005 nvmet_command(self.nvmet_bin, "clear") 1006 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1007 1008 if self.enable_adq: 1009 self.adq_configure_tc() 1010 1011 self.log_print("Done configuring kernel NVMeOF Target") 1012 1013 1014class SPDKTarget(Target): 1015 def __init__(self, name, general_config, target_config): 1016 super(SPDKTarget, self).__init__(name, general_config, target_config) 1017 1018 # Required fields 1019 self.core_mask = target_config["core_mask"] 1020 self.num_cores = self.get_num_cores(self.core_mask) 1021 1022 # Defaults 1023 self.dif_insert_strip = False 1024 self.null_block_dif_type = 0 1025 self.num_shared_buffers = 4096 1026 self.bpf_proc = None 1027 self.bpf_scripts = [] 1028 1029 if "num_shared_buffers" in target_config: 1030 self.num_shared_buffers = target_config["num_shared_buffers"] 1031 if "null_block_dif_type" in target_config: 1032 self.null_block_dif_type = target_config["null_block_dif_type"] 1033 if "dif_insert_strip" in target_config: 1034 self.dif_insert_strip = target_config["dif_insert_strip"] 1035 if "bpf_scripts" in target_config: 1036 self.bpf_scripts = target_config["bpf_scripts"] 1037 1038 def get_num_cores(self, core_mask): 1039 if "0x" in core_mask: 1040 return bin(int(core_mask, 16)).count("1") 1041 else: 1042 num_cores = 0 1043 core_mask = core_mask.replace("[", "") 1044 core_mask = core_mask.replace("]", "") 1045 for i in core_mask.split(","): 1046 if "-" in i: 1047 x, y = i.split("-") 1048 num_cores += len(range(int(x), int(y))) + 1 1049 else: 1050 num_cores += 1 1051 return num_cores 1052 1053 def spdk_tgt_configure(self): 1054 self.log_print("Configuring SPDK NVMeOF target via RPC") 1055 1056 # Create RDMA transport layer 1057 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1058 num_shared_buffers=self.num_shared_buffers, 1059 dif_insert_or_strip=self.dif_insert_strip, 1060 sock_priority=self.adq_priority) 1061 self.log_print("SPDK NVMeOF transport layer:") 1062 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1063 1064 if self.enable_adq: 1065 self.adq_configure_tc() 1066 self.log_print("Done configuring SPDK NVMeOF Target") 1067 1068 if self.null_block: 1069 self.spdk_tgt_add_nullblock(self.null_block) 1070 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1071 else: 1072 self.spdk_tgt_add_nvme_conf() 1073 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1074 1075 def spdk_tgt_add_nullblock(self, null_block_count): 1076 md_size = 0 1077 block_size = 4096 1078 if self.null_block_dif_type != 0: 1079 md_size = 128 1080 1081 self.log_print("Adding null block bdevices to config via RPC") 1082 for i in range(null_block_count): 1083 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1084 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1085 dif_type=self.null_block_dif_type, md_size=md_size) 1086 self.log_print("SPDK Bdevs configuration:") 1087 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1088 1089 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1090 self.log_print("Adding NVMe bdevs to config via RPC") 1091 1092 bdfs = get_nvme_devices_bdf() 1093 bdfs = [b.replace(":", ".") for b in bdfs] 1094 1095 if req_num_disks: 1096 if req_num_disks > len(bdfs): 1097 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1098 sys.exit(1) 1099 else: 1100 bdfs = bdfs[0:req_num_disks] 1101 1102 for i, bdf in enumerate(bdfs): 1103 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1104 1105 self.log_print("SPDK Bdevs configuration:") 1106 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1107 1108 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1109 self.log_print("Adding subsystems to config") 1110 port = "4420" 1111 if not req_num_disks: 1112 req_num_disks = get_nvme_devices_count() 1113 1114 # Distribute bdevs between provided NICs 1115 num_disks = range(0, req_num_disks) 1116 if len(num_disks) == 1: 1117 disks_per_ip = 1 1118 else: 1119 disks_per_ip = int(len(num_disks) / len(ips)) 1120 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1121 1122 # Create subsystems, add bdevs to namespaces, add listeners 1123 for ip, chunk in zip(ips, disk_chunks): 1124 for c in chunk: 1125 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1126 serial = "SPDK00%s" % c 1127 bdev_name = "Nvme%sn1" % c 1128 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1129 allow_any_host=True, max_namespaces=8) 1130 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1131 1132 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1133 nqn=nqn, 1134 trtype=self.transport, 1135 traddr=ip, 1136 trsvcid=port, 1137 adrfam="ipv4") 1138 1139 self.subsystem_info_list.append([port, nqn, ip]) 1140 self.log_print("SPDK NVMeOF subsystem configuration:") 1141 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1142 1143 def bpf_start(self): 1144 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1145 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1146 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1147 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1148 1149 with open(self.pid, "r") as fh: 1150 nvmf_pid = str(fh.readline()) 1151 1152 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1153 self.log_print(cmd) 1154 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1155 1156 def tgt_start(self): 1157 if self.null_block: 1158 self.subsys_no = 1 1159 else: 1160 self.subsys_no = get_nvme_devices_count() 1161 self.log_print("Starting SPDK NVMeOF Target process") 1162 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1163 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1164 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1165 1166 with open(self.pid, "w") as fh: 1167 fh.write(str(proc.pid)) 1168 self.nvmf_proc = proc 1169 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1170 self.log_print("Waiting for spdk to initilize...") 1171 while True: 1172 if os.path.exists("/var/tmp/spdk.sock"): 1173 break 1174 time.sleep(1) 1175 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1176 1177 if self.enable_zcopy: 1178 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1179 enable_zerocopy_send_server=True) 1180 self.log_print("Target socket options:") 1181 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1182 1183 if self.enable_adq: 1184 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1185 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1186 nvme_adminq_poll_period_us=100000, retry_count=4) 1187 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1188 1189 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1190 1191 rpc.framework_start_init(self.client) 1192 1193 if self.bpf_scripts: 1194 self.bpf_start() 1195 1196 self.spdk_tgt_configure() 1197 1198 def __del__(self): 1199 if self.bpf_proc: 1200 self.log_print("Stopping BPF Trace script") 1201 self.bpf_proc.terminate() 1202 self.bpf_proc.wait() 1203 1204 if hasattr(self, "nvmf_proc"): 1205 try: 1206 self.nvmf_proc.terminate() 1207 self.nvmf_proc.wait() 1208 except Exception as e: 1209 self.log_print(e) 1210 self.nvmf_proc.kill() 1211 self.nvmf_proc.communicate() 1212 1213 1214class KernelInitiator(Initiator): 1215 def __init__(self, name, general_config, initiator_config): 1216 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1217 1218 # Defaults 1219 self.extra_params = "" 1220 self.ioengine = "libaio" 1221 1222 if "extra_params" in initiator_config: 1223 self.extra_params = initiator_config["extra_params"] 1224 1225 if "kernel_engine" in initiator_config: 1226 self.ioengine = initiator_config["kernel_engine"] 1227 if "io_uring" in self.ioengine: 1228 self.extra_params = "--nr-poll-queues=8" 1229 1230 def __del__(self): 1231 self.ssh_connection.close() 1232 1233 def get_connected_nvme_list(self): 1234 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1235 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1236 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1237 return nvme_list 1238 1239 def kernel_init_connect(self): 1240 self.log_print("Below connection attempts may result in error messages, this is expected!") 1241 for subsystem in self.subsystem_info_list: 1242 self.log_print("Trying to connect %s %s %s" % subsystem) 1243 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1244 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1245 time.sleep(2) 1246 1247 if "io_uring" in self.ioengine: 1248 self.log_print("Setting block layer settings for io_uring.") 1249 1250 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1251 # apparently it's not possible for connected subsystems. 1252 # Results in "error: Invalid argument" 1253 block_sysfs_settings = { 1254 "iostats": "0", 1255 "rq_affinity": "0", 1256 "nomerges": "2" 1257 } 1258 1259 for disk in self.get_connected_nvme_list(): 1260 sysfs = os.path.join("/sys/block", disk, "queue") 1261 for k, v in block_sysfs_settings.items(): 1262 sysfs_opt_path = os.path.join(sysfs, k) 1263 try: 1264 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1265 except subprocess.CalledProcessError as e: 1266 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1267 finally: 1268 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1269 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1270 1271 def kernel_init_disconnect(self): 1272 for subsystem in self.subsystem_info_list: 1273 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1274 time.sleep(1) 1275 1276 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1277 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1278 1279 filename_section = "" 1280 nvme_per_split = int(len(nvme_list) / len(threads)) 1281 remainder = len(nvme_list) % len(threads) 1282 iterator = iter(nvme_list) 1283 result = [] 1284 for i in range(len(threads)): 1285 result.append([]) 1286 for _ in range(nvme_per_split): 1287 result[i].append(next(iterator)) 1288 if remainder: 1289 result[i].append(next(iterator)) 1290 remainder -= 1 1291 for i, r in enumerate(result): 1292 header = "[filename%s]" % i 1293 disks = "\n".join(["filename=%s" % x for x in r]) 1294 job_section_qd = round((io_depth * len(r)) / num_jobs) 1295 if job_section_qd == 0: 1296 job_section_qd = 1 1297 iodepth = "iodepth=%s" % job_section_qd 1298 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1299 1300 return filename_section 1301 1302 1303class SPDKInitiator(Initiator): 1304 def __init__(self, name, general_config, initiator_config): 1305 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1306 1307 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1308 self.install_spdk() 1309 1310 # Required fields 1311 self.num_cores = initiator_config["num_cores"] 1312 1313 def install_spdk(self): 1314 self.log_print("Using fio binary %s" % self.fio_bin) 1315 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1316 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1317 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1318 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1319 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1320 1321 self.log_print("SPDK built") 1322 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1323 1324 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1325 bdev_cfg_section = { 1326 "subsystems": [ 1327 { 1328 "subsystem": "bdev", 1329 "config": [] 1330 } 1331 ] 1332 } 1333 1334 for i, subsys in enumerate(remote_subsystem_list): 1335 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1336 nvme_ctrl = { 1337 "method": "bdev_nvme_attach_controller", 1338 "params": { 1339 "name": "Nvme{}".format(i), 1340 "trtype": self.transport, 1341 "traddr": sub_addr, 1342 "trsvcid": sub_port, 1343 "subnqn": sub_nqn, 1344 "adrfam": "IPv4" 1345 } 1346 } 1347 1348 if self.enable_adq: 1349 nvme_ctrl["params"].update({"priority": "1"}) 1350 1351 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1352 1353 return json.dumps(bdev_cfg_section, indent=2) 1354 1355 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1356 filename_section = "" 1357 if len(threads) >= len(subsystems): 1358 threads = range(0, len(subsystems)) 1359 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1360 nvme_per_split = int(len(subsystems) / len(threads)) 1361 remainder = len(subsystems) % len(threads) 1362 iterator = iter(filenames) 1363 result = [] 1364 for i in range(len(threads)): 1365 result.append([]) 1366 for _ in range(nvme_per_split): 1367 result[i].append(next(iterator)) 1368 if remainder: 1369 result[i].append(next(iterator)) 1370 remainder -= 1 1371 for i, r in enumerate(result): 1372 header = "[filename%s]" % i 1373 disks = "\n".join(["filename=%s" % x for x in r]) 1374 job_section_qd = round((io_depth * len(r)) / num_jobs) 1375 if job_section_qd == 0: 1376 job_section_qd = 1 1377 iodepth = "iodepth=%s" % job_section_qd 1378 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1379 1380 return filename_section 1381 1382 1383if __name__ == "__main__": 1384 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1385 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1386 1387 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1388 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1389 help='Configuration file.') 1390 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1391 help='Results directory.') 1392 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1393 help='CSV results filename.') 1394 1395 args = parser.parse_args() 1396 1397 print("Using config file: %s" % args.config) 1398 with open(args.config, "r") as config: 1399 data = json.load(config) 1400 1401 initiators = [] 1402 fio_cases = [] 1403 1404 general_config = data["general"] 1405 target_config = data["target"] 1406 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1407 1408 for k, v in data.items(): 1409 if "target" in k: 1410 v.update({"results_dir": args.results}) 1411 if data[k]["mode"] == "spdk": 1412 target_obj = SPDKTarget(k, data["general"], v) 1413 elif data[k]["mode"] == "kernel": 1414 target_obj = KernelTarget(k, data["general"], v) 1415 pass 1416 elif "initiator" in k: 1417 if data[k]["mode"] == "spdk": 1418 init_obj = SPDKInitiator(k, data["general"], v) 1419 elif data[k]["mode"] == "kernel": 1420 init_obj = KernelInitiator(k, data["general"], v) 1421 initiators.append(init_obj) 1422 elif "fio" in k: 1423 fio_workloads = itertools.product(data[k]["bs"], 1424 data[k]["qd"], 1425 data[k]["rw"]) 1426 1427 fio_run_time = data[k]["run_time"] 1428 fio_ramp_time = data[k]["ramp_time"] 1429 fio_rw_mix_read = data[k]["rwmixread"] 1430 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1431 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1432 1433 fio_rate_iops = 0 1434 if "rate_iops" in data[k]: 1435 fio_rate_iops = data[k]["rate_iops"] 1436 else: 1437 continue 1438 1439 try: 1440 os.mkdir(args.results) 1441 except FileExistsError: 1442 pass 1443 1444 target_obj.tgt_start() 1445 1446 for i in initiators: 1447 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1448 if i.enable_adq: 1449 i.adq_configure_tc() 1450 1451 # Poor mans threading 1452 # Run FIO tests 1453 for block_size, io_depth, rw in fio_workloads: 1454 threads = [] 1455 configs = [] 1456 for i in initiators: 1457 if i.mode == "kernel": 1458 i.kernel_init_connect() 1459 1460 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1461 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1462 configs.append(cfg) 1463 1464 for i, cfg in zip(initiators, configs): 1465 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1466 threads.append(t) 1467 if target_obj.enable_sar: 1468 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1469 sar_file_name = ".".join([sar_file_name, "txt"]) 1470 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1471 threads.append(t) 1472 1473 if target_obj.enable_pcm: 1474 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1475 1476 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1477 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1478 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1479 1480 threads.append(pcm_cpu_t) 1481 threads.append(pcm_mem_t) 1482 threads.append(pcm_pow_t) 1483 1484 if target_obj.enable_bandwidth: 1485 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1486 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1487 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1488 threads.append(t) 1489 1490 if target_obj.enable_dpdk_memory: 1491 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1492 threads.append(t) 1493 1494 for t in threads: 1495 t.start() 1496 for t in threads: 1497 t.join() 1498 1499 for i in initiators: 1500 if i.mode == "kernel": 1501 i.kernel_init_disconnect() 1502 i.copy_result_files(args.results) 1503 1504 target_obj.restore_governor() 1505 target_obj.restore_tuned() 1506 target_obj.restore_services() 1507 target_obj.restore_sysctl() 1508 for i in initiators: 1509 i.restore_governor() 1510 i.restore_tuned() 1511 i.restore_services() 1512 i.restore_sysctl() 1513 target_obj.parse_results(args.results, args.csv_filename) 1514