1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20 21import rpc 22import rpc.client 23from common import * 24 25 26class Server: 27 def __init__(self, name, general_config, server_config): 28 self.name = name 29 self.username = general_config["username"] 30 self.password = general_config["password"] 31 self.transport = general_config["transport"].lower() 32 self.nic_ips = server_config["nic_ips"] 33 self.mode = server_config["mode"] 34 35 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 36 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 37 self.irq_scripts_dir = server_config["irq_scripts_dir"] 38 39 self.local_nic_info = [] 40 self._nics_json_obj = {} 41 self.svc_restore_dict = {} 42 self.sysctl_restore_dict = {} 43 self.tuned_restore_dict = {} 44 self.governor_restore = "" 45 self.tuned_profile = "" 46 47 self.enable_adq = False 48 self.adq_priority = None 49 if "adq_enable" in server_config and server_config["adq_enable"]: 50 self.enable_adq = server_config["adq_enable"] 51 self.adq_priority = 1 52 53 if "tuned_profile" in server_config: 54 self.tuned_profile = server_config["tuned_profile"] 55 56 if not re.match("^[A-Za-z0-9]*$", name): 57 self.log_print("Please use a name which contains only letters or numbers") 58 sys.exit(1) 59 60 def log_print(self, msg): 61 print("[%s] %s" % (self.name, msg), flush=True) 62 63 def get_uncommented_lines(self, lines): 64 return [line for line in lines if line and not line.startswith('#')] 65 66 def get_nic_name_by_ip(self, ip): 67 if not self._nics_json_obj: 68 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 69 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 70 for nic in self._nics_json_obj: 71 for addr in nic["addr_info"]: 72 if ip in addr["local"]: 73 return nic["ifname"] 74 75 def set_local_nic_info_helper(self): 76 pass 77 78 def set_local_nic_info(self, pci_info): 79 def extract_network_elements(json_obj): 80 nic_list = [] 81 if isinstance(json_obj, list): 82 for x in json_obj: 83 nic_list.extend(extract_network_elements(x)) 84 elif isinstance(json_obj, dict): 85 if "children" in json_obj: 86 nic_list.extend(extract_network_elements(json_obj["children"])) 87 if "class" in json_obj.keys() and "network" in json_obj["class"]: 88 nic_list.append(json_obj) 89 return nic_list 90 91 self.local_nic_info = extract_network_elements(pci_info) 92 93 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 94 return "" 95 96 def configure_system(self): 97 self.configure_services() 98 self.configure_sysctl() 99 self.configure_tuned() 100 self.configure_cpu_governor() 101 self.configure_irq_affinity() 102 103 def configure_adq(self): 104 if self.mode == "kernel": 105 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 106 return 107 self.adq_load_modules() 108 self.adq_configure_nic() 109 110 def adq_load_modules(self): 111 self.log_print("Modprobing ADQ-related Linux modules...") 112 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 113 for module in adq_module_deps: 114 try: 115 self.exec_cmd(["sudo", "modprobe", module]) 116 self.log_print("%s loaded!" % module) 117 except CalledProcessError as e: 118 self.log_print("ERROR: failed to load module %s" % module) 119 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 120 121 def adq_configure_tc(self): 122 self.log_print("Configuring ADQ Traffic classess and filters...") 123 124 if self.mode == "kernel": 125 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 126 return 127 128 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 129 num_queues_tc1 = self.num_cores 130 port_param = "dst_port" if isinstance(self, Target) else "src_port" 131 port = "4420" 132 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 133 134 for nic_ip in self.nic_ips: 135 nic_name = self.get_nic_name_by_ip(nic_ip) 136 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 137 "root", "mqprio", "num_tc", "2", "map", "0", "1", 138 "queues", "%s@0" % num_queues_tc0, 139 "%s@%s" % (num_queues_tc1, num_queues_tc0), 140 "hw", "1", "mode", "channel"] 141 self.log_print(" ".join(tc_qdisc_map_cmd)) 142 self.exec_cmd(tc_qdisc_map_cmd) 143 144 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 145 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 146 self.exec_cmd(tc_qdisc_ingress_cmd) 147 148 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 149 "protocol", "ip", "ingress", "prio", "1", "flower", 150 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 151 "skip_sw", "hw_tc", "1"] 152 self.log_print(" ".join(tc_filter_cmd)) 153 self.exec_cmd(tc_filter_cmd) 154 155 # Ethtool coalese settings must be applied after configuring traffic classes 156 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 157 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 158 159 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 160 xps_cmd = ["sudo", xps_script_path, nic_name] 161 self.log_print(xps_cmd) 162 self.exec_cmd(xps_cmd) 163 164 def adq_configure_nic(self): 165 self.log_print("Configuring NIC port settings for ADQ testing...") 166 167 # Reload the driver first, to make sure any previous settings are re-set. 168 try: 169 self.exec_cmd(["sudo", "rmmod", "ice"]) 170 self.exec_cmd(["sudo", "modprobe", "ice"]) 171 except CalledProcessError as e: 172 self.log_print("ERROR: failed to reload ice module!") 173 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 174 175 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 176 for nic in nic_names: 177 self.log_print(nic) 178 try: 179 self.exec_cmd(["sudo", "ethtool", "-K", nic, 180 "hw-tc-offload", "on"]) # Enable hardware TC offload 181 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 182 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 183 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 184 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 185 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 186 except CalledProcessError as e: 187 self.log_print("ERROR: failed to configure NIC port using ethtool!") 188 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 189 self.log_print("Please update your NIC driver and firmware versions and try again.") 190 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 191 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 192 193 def configure_services(self): 194 self.log_print("Configuring active services...") 195 svc_config = configparser.ConfigParser(strict=False) 196 197 # Below list is valid only for RHEL / Fedora systems and might not 198 # contain valid names for other distributions. 199 svc_target_state = { 200 "firewalld": "inactive", 201 "irqbalance": "inactive", 202 "lldpad.service": "inactive", 203 "lldpad.socket": "inactive" 204 } 205 206 for service in svc_target_state: 207 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 208 out = "\n".join(["[%s]" % service, out]) 209 svc_config.read_string(out) 210 211 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 212 continue 213 214 service_state = svc_config[service]["ActiveState"] 215 self.log_print("Current state of %s service is %s" % (service, service_state)) 216 self.svc_restore_dict.update({service: service_state}) 217 if service_state != "inactive": 218 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 219 self.exec_cmd(["sudo", "systemctl", "stop", service]) 220 221 def configure_sysctl(self): 222 self.log_print("Tuning sysctl settings...") 223 224 busy_read = 0 225 if self.enable_adq and self.mode == "spdk": 226 busy_read = 1 227 228 sysctl_opts = { 229 "net.core.busy_poll": 0, 230 "net.core.busy_read": busy_read, 231 "net.core.somaxconn": 4096, 232 "net.core.netdev_max_backlog": 8192, 233 "net.ipv4.tcp_max_syn_backlog": 16384, 234 "net.core.rmem_max": 268435456, 235 "net.core.wmem_max": 268435456, 236 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 237 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 238 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 239 "net.ipv4.route.flush": 1, 240 "vm.overcommit_memory": 1, 241 } 242 243 for opt, value in sysctl_opts.items(): 244 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 245 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 246 247 def configure_tuned(self): 248 if not self.tuned_profile: 249 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 250 return 251 252 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 253 service = "tuned" 254 tuned_config = configparser.ConfigParser(strict=False) 255 256 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 257 out = "\n".join(["[%s]" % service, out]) 258 tuned_config.read_string(out) 259 tuned_state = tuned_config[service]["ActiveState"] 260 self.svc_restore_dict.update({service: tuned_state}) 261 262 if tuned_state != "inactive": 263 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 264 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 265 266 self.tuned_restore_dict = { 267 "profile": profile, 268 "mode": profile_mode 269 } 270 271 self.exec_cmd(["sudo", "systemctl", "start", service]) 272 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 273 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 274 275 def configure_cpu_governor(self): 276 self.log_print("Setting CPU governor to performance...") 277 278 # This assumes that there is the same CPU scaling governor on each CPU 279 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 280 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 281 282 def configure_irq_affinity(self): 283 self.log_print("Setting NIC irq affinity for NICs...") 284 285 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 286 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 287 for nic in nic_names: 288 irq_cmd = ["sudo", irq_script_path, nic] 289 self.log_print(irq_cmd) 290 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 291 292 def restore_services(self): 293 self.log_print("Restoring services...") 294 for service, state in self.svc_restore_dict.items(): 295 cmd = "stop" if state == "inactive" else "start" 296 self.exec_cmd(["sudo", "systemctl", cmd, service]) 297 298 def restore_sysctl(self): 299 self.log_print("Restoring sysctl settings...") 300 for opt, value in self.sysctl_restore_dict.items(): 301 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 302 303 def restore_tuned(self): 304 self.log_print("Restoring tuned-adm settings...") 305 306 if not self.tuned_restore_dict: 307 return 308 309 if self.tuned_restore_dict["mode"] == "auto": 310 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 311 self.log_print("Reverted tuned-adm to auto_profile.") 312 else: 313 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 314 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 315 316 def restore_governor(self): 317 self.log_print("Restoring CPU governor setting...") 318 if self.governor_restore: 319 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 320 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 321 322 323class Target(Server): 324 def __init__(self, name, general_config, target_config): 325 super(Target, self).__init__(name, general_config, target_config) 326 327 # Defaults 328 self.enable_sar = False 329 self.sar_delay = 0 330 self.sar_interval = 0 331 self.sar_count = 0 332 self.enable_pcm = False 333 self.pcm_dir = "" 334 self.pcm_delay = 0 335 self.pcm_interval = 0 336 self.pcm_count = 0 337 self.enable_bandwidth = 0 338 self.bandwidth_count = 0 339 self.enable_dpdk_memory = False 340 self.dpdk_wait_time = 0 341 self.enable_zcopy = False 342 self.scheduler_name = "static" 343 self.null_block = 0 344 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 345 self.subsystem_info_list = [] 346 347 if "null_block_devices" in target_config: 348 self.null_block = target_config["null_block_devices"] 349 if "sar_settings" in target_config: 350 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 351 if "pcm_settings" in target_config: 352 self.enable_pcm = True 353 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 354 if "enable_bandwidth" in target_config: 355 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 356 if "enable_dpdk_memory" in target_config: 357 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 358 if "scheduler_settings" in target_config: 359 self.scheduler_name = target_config["scheduler_settings"] 360 if "zcopy_settings" in target_config: 361 self.enable_zcopy = target_config["zcopy_settings"] 362 if "results_dir" in target_config: 363 self.results_dir = target_config["results_dir"] 364 365 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 366 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 367 self.set_local_nic_info(self.set_local_nic_info_helper()) 368 369 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 370 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 371 372 self.configure_system() 373 if self.enable_adq: 374 self.configure_adq() 375 self.sys_config() 376 377 def set_local_nic_info_helper(self): 378 return json.loads(self.exec_cmd(["lshw", "-json"])) 379 380 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 381 stderr_opt = None 382 if stderr_redirect: 383 stderr_opt = subprocess.STDOUT 384 if change_dir: 385 old_cwd = os.getcwd() 386 os.chdir(change_dir) 387 self.log_print("Changing directory to %s" % change_dir) 388 389 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 390 391 if change_dir: 392 os.chdir(old_cwd) 393 self.log_print("Changing directory to %s" % old_cwd) 394 return out 395 396 def zip_spdk_sources(self, spdk_dir, dest_file): 397 self.log_print("Zipping SPDK source directory") 398 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 399 for root, directories, files in os.walk(spdk_dir, followlinks=True): 400 for file in files: 401 fh.write(os.path.relpath(os.path.join(root, file))) 402 fh.close() 403 self.log_print("Done zipping") 404 405 def read_json_stats(self, file): 406 with open(file, "r") as json_data: 407 data = json.load(json_data) 408 job_pos = 0 # job_post = 0 because using aggregated results 409 410 # Check if latency is in nano or microseconds to choose correct dict key 411 def get_lat_unit(key_prefix, dict_section): 412 # key prefix - lat, clat or slat. 413 # dict section - portion of json containing latency bucket in question 414 # Return dict key to access the bucket and unit as string 415 for k, _ in dict_section.items(): 416 if k.startswith(key_prefix): 417 return k, k.split("_")[1] 418 419 def get_clat_percentiles(clat_dict_leaf): 420 if "percentile" in clat_dict_leaf: 421 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 422 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 423 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 424 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 425 426 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 427 else: 428 # Latest fio versions do not provide "percentile" results if no 429 # measurements were done, so just return zeroes 430 return [0, 0, 0, 0] 431 432 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 433 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 434 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 435 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 436 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 437 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 438 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 439 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 440 data["jobs"][job_pos]["read"][clat_key]) 441 442 if "ns" in lat_unit: 443 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 444 if "ns" in clat_unit: 445 read_p99_lat = read_p99_lat / 1000 446 read_p99_9_lat = read_p99_9_lat / 1000 447 read_p99_99_lat = read_p99_99_lat / 1000 448 read_p99_999_lat = read_p99_999_lat / 1000 449 450 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 451 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 452 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 453 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 454 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 455 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 456 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 457 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 458 data["jobs"][job_pos]["write"][clat_key]) 459 460 if "ns" in lat_unit: 461 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 462 if "ns" in clat_unit: 463 write_p99_lat = write_p99_lat / 1000 464 write_p99_9_lat = write_p99_9_lat / 1000 465 write_p99_99_lat = write_p99_99_lat / 1000 466 write_p99_999_lat = write_p99_999_lat / 1000 467 468 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 469 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 470 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 471 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 472 473 def parse_results(self, results_dir, csv_file): 474 files = os.listdir(results_dir) 475 fio_files = filter(lambda x: ".fio" in x, files) 476 json_files = [x for x in files if ".json" in x] 477 478 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 479 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 480 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 481 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 482 483 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 484 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 485 486 header_line = ",".join(["Name", *headers]) 487 aggr_header_line = ",".join(["Name", *aggr_headers]) 488 489 # Create empty results file 490 with open(os.path.join(results_dir, csv_file), "w") as fh: 491 fh.write(aggr_header_line + "\n") 492 rows = set() 493 494 for fio_config in fio_files: 495 self.log_print("Getting FIO stats for %s" % fio_config) 496 job_name, _ = os.path.splitext(fio_config) 497 498 # Look in the filename for rwmixread value. Function arguments do 499 # not have that information. 500 # TODO: Improve this function by directly using workload params instead 501 # of regexing through filenames. 502 if "read" in job_name: 503 rw_mixread = 1 504 elif "write" in job_name: 505 rw_mixread = 0 506 else: 507 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 508 509 # If "_CPU" exists in name - ignore it 510 # Initiators for the same job could have diffrent num_cores parameter 511 job_name = re.sub(r"_\d+CPU", "", job_name) 512 job_result_files = [x for x in json_files if job_name in x] 513 self.log_print("Matching result files for current fio config:") 514 for j in job_result_files: 515 self.log_print("\t %s" % j) 516 517 # There may have been more than 1 initiator used in test, need to check that 518 # Result files are created so that string after last "_" separator is server name 519 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 520 inits_avg_results = [] 521 for i in inits_names: 522 self.log_print("\tGetting stats for initiator %s" % i) 523 # There may have been more than 1 test run for this job, calculate average results for initiator 524 i_results = [x for x in job_result_files if i in x] 525 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 526 527 separate_stats = [] 528 for r in i_results: 529 try: 530 stats = self.read_json_stats(os.path.join(results_dir, r)) 531 separate_stats.append(stats) 532 self.log_print(stats) 533 except JSONDecodeError as e: 534 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!") 535 536 init_results = [sum(x) for x in zip(*separate_stats)] 537 init_results = [x / len(separate_stats) for x in init_results] 538 inits_avg_results.append(init_results) 539 540 self.log_print("\tAverage results for initiator %s" % i) 541 self.log_print(init_results) 542 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 543 fh.write(header_line + "\n") 544 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 545 546 # Sum results of all initiators running this FIO job. 547 # Latency results are an average of latencies from accros all initiators. 548 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 549 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 550 for key in inits_avg_results: 551 if "lat" in key: 552 inits_avg_results[key] /= len(inits_names) 553 554 # Aggregate separate read/write values into common labels 555 # Take rw_mixread into consideration for mixed read/write workloads. 556 aggregate_results = OrderedDict() 557 for h in aggr_headers: 558 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 559 if "lat" in h: 560 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 561 else: 562 _ = read_stat + write_stat 563 aggregate_results[h] = "{0:.3f}".format(_) 564 565 rows.add(",".join([job_name, *aggregate_results.values()])) 566 567 # Save results to file 568 for row in rows: 569 with open(os.path.join(results_dir, csv_file), "a") as fh: 570 fh.write(row + "\n") 571 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 572 573 def measure_sar(self, results_dir, sar_file_name): 574 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 575 cpu_number = os.cpu_count() 576 sar_idle_sum = 0 577 time.sleep(self.sar_delay) 578 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 579 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 580 for line in out.split("\n"): 581 if "Average" in line: 582 if "CPU" in line: 583 self.log_print("Summary CPU utilization from SAR:") 584 self.log_print(line) 585 elif "all" in line: 586 self.log_print(line) 587 else: 588 sar_idle_sum += line.split()[7] 589 fh.write(out) 590 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 591 with open(os.path.join(results_dir, sar_file_name), "a") as f: 592 f.write("Total CPU used: ".join(sar_cpu_usage)) 593 594 def measure_pcm_memory(self, results_dir, pcm_file_name): 595 time.sleep(self.pcm_delay) 596 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 597 pcm_memory = subprocess.Popen(cmd) 598 time.sleep(self.pcm_count) 599 pcm_memory.terminate() 600 601 def measure_pcm(self, results_dir, pcm_file_name): 602 time.sleep(self.pcm_delay) 603 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 604 subprocess.run(cmd) 605 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 606 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 607 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 608 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 609 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 610 611 def measure_pcm_power(self, results_dir, pcm_power_file_name): 612 time.sleep(self.pcm_delay) 613 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 614 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 615 fh.write(out) 616 617 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 618 self.log_print("INFO: starting network bandwidth measure") 619 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 620 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 621 622 def measure_dpdk_memory(self, results_dir): 623 self.log_print("INFO: waiting to generate DPDK memory usage") 624 time.sleep(self.dpdk_wait_time) 625 self.log_print("INFO: generating DPDK memory usage") 626 rpc.env.env_dpdk_get_mem_stats 627 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 628 629 def sys_config(self): 630 self.log_print("====Kernel release:====") 631 self.log_print(os.uname().release) 632 self.log_print("====Kernel command line:====") 633 with open('/proc/cmdline') as f: 634 cmdline = f.readlines() 635 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 636 self.log_print("====sysctl conf:====") 637 with open('/etc/sysctl.conf') as f: 638 sysctl = f.readlines() 639 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 640 self.log_print("====Cpu power info:====") 641 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 642 self.log_print("====zcopy settings:====") 643 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 644 self.log_print("====Scheduler settings:====") 645 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 646 647 648class Initiator(Server): 649 def __init__(self, name, general_config, initiator_config): 650 super(Initiator, self).__init__(name, general_config, initiator_config) 651 652 # Required fields 653 self.ip = initiator_config["ip"] 654 self.target_nic_ips = initiator_config["target_nic_ips"] 655 656 # Defaults 657 self.cpus_allowed = None 658 self.cpus_allowed_policy = "shared" 659 self.spdk_dir = "/tmp/spdk" 660 self.fio_bin = "/usr/src/fio/fio" 661 self.nvmecli_bin = "nvme" 662 self.cpu_frequency = None 663 self.subsystem_info_list = [] 664 665 if "spdk_dir" in initiator_config: 666 self.spdk_dir = initiator_config["spdk_dir"] 667 if "fio_bin" in initiator_config: 668 self.fio_bin = initiator_config["fio_bin"] 669 if "nvmecli_bin" in initiator_config: 670 self.nvmecli_bin = initiator_config["nvmecli_bin"] 671 if "cpus_allowed" in initiator_config: 672 self.cpus_allowed = initiator_config["cpus_allowed"] 673 if "cpus_allowed_policy" in initiator_config: 674 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 675 if "cpu_frequency" in initiator_config: 676 self.cpu_frequency = initiator_config["cpu_frequency"] 677 if os.getenv('SPDK_WORKSPACE'): 678 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 679 680 self.ssh_connection = paramiko.SSHClient() 681 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 682 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 683 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 684 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 685 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 686 687 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 688 self.copy_spdk("/tmp/spdk.zip") 689 self.set_local_nic_info(self.set_local_nic_info_helper()) 690 self.set_cpu_frequency() 691 self.configure_system() 692 if self.enable_adq: 693 self.configure_adq() 694 self.sys_config() 695 696 def set_local_nic_info_helper(self): 697 return json.loads(self.exec_cmd(["lshw", "-json"])) 698 699 def __del__(self): 700 self.ssh_connection.close() 701 702 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 703 if change_dir: 704 cmd = ["cd", change_dir, ";", *cmd] 705 706 # In case one of the command elements contains whitespace and is not 707 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 708 # when sending to remote system. 709 for i, c in enumerate(cmd): 710 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 711 cmd[i] = '"%s"' % c 712 cmd = " ".join(cmd) 713 714 # Redirect stderr to stdout thanks using get_pty option if needed 715 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 716 out = stdout.read().decode(encoding="utf-8") 717 718 # Check the return code 719 rc = stdout.channel.recv_exit_status() 720 if rc: 721 raise CalledProcessError(int(rc), cmd, out) 722 723 return out 724 725 def put_file(self, local, remote_dest): 726 ftp = self.ssh_connection.open_sftp() 727 ftp.put(local, remote_dest) 728 ftp.close() 729 730 def get_file(self, remote, local_dest): 731 ftp = self.ssh_connection.open_sftp() 732 ftp.get(remote, local_dest) 733 ftp.close() 734 735 def copy_spdk(self, local_spdk_zip): 736 self.log_print("Copying SPDK sources to initiator %s" % self.name) 737 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 738 self.log_print("Copied sources zip from target") 739 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 740 self.log_print("Sources unpacked") 741 742 def copy_result_files(self, dest_dir): 743 self.log_print("Copying results") 744 745 if not os.path.exists(dest_dir): 746 os.mkdir(dest_dir) 747 748 # Get list of result files from initiator and copy them back to target 749 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 750 751 for file in file_list: 752 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 753 os.path.join(dest_dir, file)) 754 self.log_print("Done copying results") 755 756 def discover_subsystems(self, address_list, subsys_no): 757 num_nvmes = range(0, subsys_no) 758 nvme_discover_output = "" 759 for ip, subsys_no in itertools.product(address_list, num_nvmes): 760 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 761 nvme_discover_cmd = ["sudo", 762 "%s" % self.nvmecli_bin, 763 "discover", "-t", "%s" % self.transport, 764 "-s", "%s" % (4420 + subsys_no), 765 "-a", "%s" % ip] 766 767 try: 768 stdout = self.exec_cmd(nvme_discover_cmd) 769 if stdout: 770 nvme_discover_output = nvme_discover_output + stdout 771 except CalledProcessError: 772 # Do nothing. In case of discovering remote subsystems of kernel target 773 # we expect "nvme discover" to fail a bunch of times because we basically 774 # scan ports. 775 pass 776 777 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 778 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 779 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 780 nvme_discover_output) # from nvme discovery output 781 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 782 subsystems = list(set(subsystems)) 783 subsystems.sort(key=lambda x: x[1]) 784 self.log_print("Found matching subsystems on target side:") 785 for s in subsystems: 786 self.log_print(s) 787 self.subsystem_info_list = subsystems 788 789 def gen_fio_filename_conf(self, *args, **kwargs): 790 # Logic implemented in SPDKInitiator and KernelInitiator classes 791 pass 792 793 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 794 fio_conf_template = """ 795[global] 796ioengine={ioengine} 797{spdk_conf} 798thread=1 799group_reporting=1 800direct=1 801percentile_list=50:90:99:99.5:99.9:99.99:99.999 802 803norandommap=1 804rw={rw} 805rwmixread={rwmixread} 806bs={block_size} 807time_based=1 808ramp_time={ramp_time} 809runtime={run_time} 810rate_iops={rate_iops} 811""" 812 if "spdk" in self.mode: 813 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 814 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 815 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 816 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 817 else: 818 ioengine = self.ioengine 819 spdk_conf = "" 820 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 821 "|", "awk", "'{print $1}'"]) 822 subsystems = [x for x in out.split("\n") if "nvme" in x] 823 824 if self.cpus_allowed is not None: 825 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 826 cpus_num = 0 827 cpus = self.cpus_allowed.split(",") 828 for cpu in cpus: 829 if "-" in cpu: 830 a, b = cpu.split("-") 831 a = int(a) 832 b = int(b) 833 cpus_num += len(range(a, b)) 834 else: 835 cpus_num += 1 836 self.num_cores = cpus_num 837 threads = range(0, self.num_cores) 838 elif hasattr(self, 'num_cores'): 839 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 840 threads = range(0, int(self.num_cores)) 841 else: 842 self.num_cores = len(subsystems) 843 threads = range(0, len(subsystems)) 844 845 if "spdk" in self.mode: 846 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 847 else: 848 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 849 850 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 851 rw=rw, rwmixread=rwmixread, block_size=block_size, 852 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 853 854 # TODO: hipri disabled for now, as it causes fio errors: 855 # io_u error on file /dev/nvme2n1: Operation not supported 856 # See comment in KernelInitiator class, kernel_init_connect() function 857 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 858 fio_config = fio_config + """ 859fixedbufs=1 860registerfiles=1 861#hipri=1 862""" 863 if num_jobs: 864 fio_config = fio_config + "numjobs=%s \n" % num_jobs 865 if self.cpus_allowed is not None: 866 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 867 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 868 fio_config = fio_config + filename_section 869 870 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 871 if hasattr(self, "num_cores"): 872 fio_config_filename += "_%sCPU" % self.num_cores 873 fio_config_filename += ".fio" 874 875 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 876 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 877 self.log_print("Created FIO Config:") 878 self.log_print(fio_config) 879 880 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 881 882 def set_cpu_frequency(self): 883 if self.cpu_frequency is not None: 884 try: 885 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 886 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 887 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 888 except Exception: 889 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 890 sys.exit() 891 else: 892 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 893 894 def run_fio(self, fio_config_file, run_num=None): 895 job_name, _ = os.path.splitext(fio_config_file) 896 self.log_print("Starting FIO run for job: %s" % job_name) 897 self.log_print("Using FIO: %s" % self.fio_bin) 898 899 if run_num: 900 for i in range(1, run_num + 1): 901 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 902 try: 903 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 904 "--output=%s" % output_filename, "--eta=never"], True) 905 self.log_print(output) 906 except subprocess.CalledProcessError as e: 907 self.log_print("ERROR: Fio process failed!") 908 self.log_print(e.stdout) 909 else: 910 output_filename = job_name + "_" + self.name + ".json" 911 output = self.exec_cmd(["sudo", self.fio_bin, 912 fio_config_file, "--output-format=json", 913 "--output" % output_filename], True) 914 self.log_print(output) 915 self.log_print("FIO run finished. Results in: %s" % output_filename) 916 917 def sys_config(self): 918 self.log_print("====Kernel release:====") 919 self.log_print(self.exec_cmd(["uname", "-r"])) 920 self.log_print("====Kernel command line:====") 921 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 922 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 923 self.log_print("====sysctl conf:====") 924 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 925 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 926 self.log_print("====Cpu power info:====") 927 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 928 929 930class KernelTarget(Target): 931 def __init__(self, name, general_config, target_config): 932 super(KernelTarget, self).__init__(name, general_config, target_config) 933 # Defaults 934 self.nvmet_bin = "nvmetcli" 935 936 if "nvmet_bin" in target_config: 937 self.nvmet_bin = target_config["nvmet_bin"] 938 939 def __del__(self): 940 nvmet_command(self.nvmet_bin, "clear") 941 942 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 943 944 nvmet_cfg = { 945 "ports": [], 946 "hosts": [], 947 "subsystems": [], 948 } 949 950 # Split disks between NIC IP's 951 disks_per_ip = int(len(nvme_list) / len(address_list)) 952 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 953 954 subsys_no = 1 955 port_no = 0 956 for ip, chunk in zip(address_list, disk_chunks): 957 for disk in chunk: 958 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 959 nvmet_cfg["subsystems"].append({ 960 "allowed_hosts": [], 961 "attr": { 962 "allow_any_host": "1", 963 "serial": "SPDK00%s" % subsys_no, 964 "version": "1.3" 965 }, 966 "namespaces": [ 967 { 968 "device": { 969 "path": disk, 970 "uuid": "%s" % uuid.uuid4() 971 }, 972 "enable": 1, 973 "nsid": subsys_no 974 } 975 ], 976 "nqn": nqn 977 }) 978 979 nvmet_cfg["ports"].append({ 980 "addr": { 981 "adrfam": "ipv4", 982 "traddr": ip, 983 "trsvcid": "%s" % (4420 + port_no), 984 "trtype": "%s" % self.transport 985 }, 986 "portid": subsys_no, 987 "referrals": [], 988 "subsystems": [nqn] 989 }) 990 subsys_no += 1 991 port_no += 1 992 self.subsystem_info_list.append([port_no, nqn, ip]) 993 994 with open("kernel.conf", "w") as fh: 995 fh.write(json.dumps(nvmet_cfg, indent=2)) 996 pass 997 998 def tgt_start(self): 999 self.log_print("Configuring kernel NVMeOF Target") 1000 1001 if self.null_block: 1002 print("Configuring with null block device.") 1003 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1004 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 1005 self.subsys_no = len(null_blk_list) 1006 else: 1007 print("Configuring with NVMe drives.") 1008 nvme_list = get_nvme_devices() 1009 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1010 self.subsys_no = len(nvme_list) 1011 1012 nvmet_command(self.nvmet_bin, "clear") 1013 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1014 1015 if self.enable_adq: 1016 self.adq_configure_tc() 1017 1018 self.log_print("Done configuring kernel NVMeOF Target") 1019 1020 1021class SPDKTarget(Target): 1022 def __init__(self, name, general_config, target_config): 1023 super(SPDKTarget, self).__init__(name, general_config, target_config) 1024 1025 # Required fields 1026 self.core_mask = target_config["core_mask"] 1027 self.num_cores = self.get_num_cores(self.core_mask) 1028 1029 # Defaults 1030 self.dif_insert_strip = False 1031 self.null_block_dif_type = 0 1032 self.num_shared_buffers = 4096 1033 self.bpf_proc = None 1034 self.bpf_scripts = [] 1035 1036 if "num_shared_buffers" in target_config: 1037 self.num_shared_buffers = target_config["num_shared_buffers"] 1038 if "null_block_dif_type" in target_config: 1039 self.null_block_dif_type = target_config["null_block_dif_type"] 1040 if "dif_insert_strip" in target_config: 1041 self.dif_insert_strip = target_config["dif_insert_strip"] 1042 if "bpf_scripts" in target_config: 1043 self.bpf_scripts = target_config["bpf_scripts"] 1044 1045 def get_num_cores(self, core_mask): 1046 if "0x" in core_mask: 1047 return bin(int(core_mask, 16)).count("1") 1048 else: 1049 num_cores = 0 1050 core_mask = core_mask.replace("[", "") 1051 core_mask = core_mask.replace("]", "") 1052 for i in core_mask.split(","): 1053 if "-" in i: 1054 x, y = i.split("-") 1055 num_cores += len(range(int(x), int(y))) + 1 1056 else: 1057 num_cores += 1 1058 return num_cores 1059 1060 def spdk_tgt_configure(self): 1061 self.log_print("Configuring SPDK NVMeOF target via RPC") 1062 1063 # Create RDMA transport layer 1064 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1065 num_shared_buffers=self.num_shared_buffers, 1066 dif_insert_or_strip=self.dif_insert_strip, 1067 sock_priority=self.adq_priority) 1068 self.log_print("SPDK NVMeOF transport layer:") 1069 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1070 1071 if self.enable_adq: 1072 self.adq_configure_tc() 1073 self.log_print("Done configuring SPDK NVMeOF Target") 1074 1075 if self.null_block: 1076 self.spdk_tgt_add_nullblock(self.null_block) 1077 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1078 else: 1079 self.spdk_tgt_add_nvme_conf() 1080 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1081 1082 def spdk_tgt_add_nullblock(self, null_block_count): 1083 md_size = 0 1084 block_size = 4096 1085 if self.null_block_dif_type != 0: 1086 md_size = 128 1087 1088 self.log_print("Adding null block bdevices to config via RPC") 1089 for i in range(null_block_count): 1090 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1091 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1092 dif_type=self.null_block_dif_type, md_size=md_size) 1093 self.log_print("SPDK Bdevs configuration:") 1094 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1095 1096 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1097 self.log_print("Adding NVMe bdevs to config via RPC") 1098 1099 bdfs = get_nvme_devices_bdf() 1100 bdfs = [b.replace(":", ".") for b in bdfs] 1101 1102 if req_num_disks: 1103 if req_num_disks > len(bdfs): 1104 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1105 sys.exit(1) 1106 else: 1107 bdfs = bdfs[0:req_num_disks] 1108 1109 for i, bdf in enumerate(bdfs): 1110 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1111 1112 self.log_print("SPDK Bdevs configuration:") 1113 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1114 1115 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1116 self.log_print("Adding subsystems to config") 1117 port = "4420" 1118 if not req_num_disks: 1119 req_num_disks = get_nvme_devices_count() 1120 1121 # Distribute bdevs between provided NICs 1122 num_disks = range(0, req_num_disks) 1123 if len(num_disks) == 1: 1124 disks_per_ip = 1 1125 else: 1126 disks_per_ip = int(len(num_disks) / len(ips)) 1127 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1128 1129 # Create subsystems, add bdevs to namespaces, add listeners 1130 for ip, chunk in zip(ips, disk_chunks): 1131 for c in chunk: 1132 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1133 serial = "SPDK00%s" % c 1134 bdev_name = "Nvme%sn1" % c 1135 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1136 allow_any_host=True, max_namespaces=8) 1137 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1138 1139 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1140 nqn=nqn, 1141 trtype=self.transport, 1142 traddr=ip, 1143 trsvcid=port, 1144 adrfam="ipv4") 1145 1146 self.subsystem_info_list.append([port, nqn, ip]) 1147 self.log_print("SPDK NVMeOF subsystem configuration:") 1148 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1149 1150 def bpf_start(self): 1151 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1152 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1153 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1154 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1155 1156 with open(self.pid, "r") as fh: 1157 nvmf_pid = str(fh.readline()) 1158 1159 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1160 self.log_print(cmd) 1161 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1162 1163 def tgt_start(self): 1164 if self.null_block: 1165 self.subsys_no = 1 1166 else: 1167 self.subsys_no = get_nvme_devices_count() 1168 self.log_print("Starting SPDK NVMeOF Target process") 1169 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1170 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1171 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1172 1173 with open(self.pid, "w") as fh: 1174 fh.write(str(proc.pid)) 1175 self.nvmf_proc = proc 1176 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1177 self.log_print("Waiting for spdk to initilize...") 1178 while True: 1179 if os.path.exists("/var/tmp/spdk.sock"): 1180 break 1181 time.sleep(1) 1182 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1183 1184 if self.enable_zcopy: 1185 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1186 enable_zerocopy_send_server=True) 1187 self.log_print("Target socket options:") 1188 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1189 1190 if self.enable_adq: 1191 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1192 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1193 nvme_adminq_poll_period_us=100000, retry_count=4) 1194 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1195 1196 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1197 1198 rpc.framework_start_init(self.client) 1199 1200 if self.bpf_scripts: 1201 self.bpf_start() 1202 1203 self.spdk_tgt_configure() 1204 1205 def __del__(self): 1206 if self.bpf_proc: 1207 self.log_print("Stopping BPF Trace script") 1208 self.bpf_proc.terminate() 1209 self.bpf_proc.wait() 1210 1211 if hasattr(self, "nvmf_proc"): 1212 try: 1213 self.nvmf_proc.terminate() 1214 self.nvmf_proc.wait() 1215 except Exception as e: 1216 self.log_print(e) 1217 self.nvmf_proc.kill() 1218 self.nvmf_proc.communicate() 1219 1220 1221class KernelInitiator(Initiator): 1222 def __init__(self, name, general_config, initiator_config): 1223 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1224 1225 # Defaults 1226 self.extra_params = "" 1227 self.ioengine = "libaio" 1228 1229 if "extra_params" in initiator_config: 1230 self.extra_params = initiator_config["extra_params"] 1231 1232 if "kernel_engine" in initiator_config: 1233 self.ioengine = initiator_config["kernel_engine"] 1234 if "io_uring" in self.ioengine: 1235 self.extra_params = "--nr-poll-queues=8" 1236 1237 def __del__(self): 1238 self.ssh_connection.close() 1239 1240 def get_connected_nvme_list(self): 1241 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1242 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1243 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1244 return nvme_list 1245 1246 def kernel_init_connect(self): 1247 self.log_print("Below connection attempts may result in error messages, this is expected!") 1248 for subsystem in self.subsystem_info_list: 1249 self.log_print("Trying to connect %s %s %s" % subsystem) 1250 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1251 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1252 time.sleep(2) 1253 1254 if "io_uring" in self.ioengine: 1255 self.log_print("Setting block layer settings for io_uring.") 1256 1257 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1258 # apparently it's not possible for connected subsystems. 1259 # Results in "error: Invalid argument" 1260 block_sysfs_settings = { 1261 "iostats": "0", 1262 "rq_affinity": "0", 1263 "nomerges": "2" 1264 } 1265 1266 for disk in self.get_connected_nvme_list(): 1267 sysfs = os.path.join("/sys/block", disk, "queue") 1268 for k, v in block_sysfs_settings.items(): 1269 sysfs_opt_path = os.path.join(sysfs, k) 1270 try: 1271 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1272 except subprocess.CalledProcessError as e: 1273 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1274 finally: 1275 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1276 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1277 1278 def kernel_init_disconnect(self): 1279 for subsystem in self.subsystem_info_list: 1280 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1281 time.sleep(1) 1282 1283 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1284 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1285 1286 filename_section = "" 1287 nvme_per_split = int(len(nvme_list) / len(threads)) 1288 remainder = len(nvme_list) % len(threads) 1289 iterator = iter(nvme_list) 1290 result = [] 1291 for i in range(len(threads)): 1292 result.append([]) 1293 for _ in range(nvme_per_split): 1294 result[i].append(next(iterator)) 1295 if remainder: 1296 result[i].append(next(iterator)) 1297 remainder -= 1 1298 for i, r in enumerate(result): 1299 header = "[filename%s]" % i 1300 disks = "\n".join(["filename=%s" % x for x in r]) 1301 job_section_qd = round((io_depth * len(r)) / num_jobs) 1302 if job_section_qd == 0: 1303 job_section_qd = 1 1304 iodepth = "iodepth=%s" % job_section_qd 1305 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1306 1307 return filename_section 1308 1309 1310class SPDKInitiator(Initiator): 1311 def __init__(self, name, general_config, initiator_config): 1312 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1313 1314 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1315 self.install_spdk() 1316 1317 # Required fields 1318 self.num_cores = initiator_config["num_cores"] 1319 1320 def install_spdk(self): 1321 self.log_print("Using fio binary %s" % self.fio_bin) 1322 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1323 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1324 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1325 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1326 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1327 1328 self.log_print("SPDK built") 1329 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1330 1331 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1332 bdev_cfg_section = { 1333 "subsystems": [ 1334 { 1335 "subsystem": "bdev", 1336 "config": [] 1337 } 1338 ] 1339 } 1340 1341 for i, subsys in enumerate(remote_subsystem_list): 1342 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1343 nvme_ctrl = { 1344 "method": "bdev_nvme_attach_controller", 1345 "params": { 1346 "name": "Nvme{}".format(i), 1347 "trtype": self.transport, 1348 "traddr": sub_addr, 1349 "trsvcid": sub_port, 1350 "subnqn": sub_nqn, 1351 "adrfam": "IPv4" 1352 } 1353 } 1354 1355 if self.enable_adq: 1356 nvme_ctrl["params"].update({"priority": "1"}) 1357 1358 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1359 1360 return json.dumps(bdev_cfg_section, indent=2) 1361 1362 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1363 filename_section = "" 1364 if len(threads) >= len(subsystems): 1365 threads = range(0, len(subsystems)) 1366 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1367 nvme_per_split = int(len(subsystems) / len(threads)) 1368 remainder = len(subsystems) % len(threads) 1369 iterator = iter(filenames) 1370 result = [] 1371 for i in range(len(threads)): 1372 result.append([]) 1373 for _ in range(nvme_per_split): 1374 result[i].append(next(iterator)) 1375 if remainder: 1376 result[i].append(next(iterator)) 1377 remainder -= 1 1378 for i, r in enumerate(result): 1379 header = "[filename%s]" % i 1380 disks = "\n".join(["filename=%s" % x for x in r]) 1381 job_section_qd = round((io_depth * len(r)) / num_jobs) 1382 if job_section_qd == 0: 1383 job_section_qd = 1 1384 iodepth = "iodepth=%s" % job_section_qd 1385 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1386 1387 return filename_section 1388 1389 1390if __name__ == "__main__": 1391 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1392 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1393 1394 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1395 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1396 help='Configuration file.') 1397 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1398 help='Results directory.') 1399 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1400 help='CSV results filename.') 1401 1402 args = parser.parse_args() 1403 1404 print("Using config file: %s" % args.config) 1405 with open(args.config, "r") as config: 1406 data = json.load(config) 1407 1408 initiators = [] 1409 fio_cases = [] 1410 1411 general_config = data["general"] 1412 target_config = data["target"] 1413 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1414 1415 for k, v in data.items(): 1416 if "target" in k: 1417 v.update({"results_dir": args.results}) 1418 if data[k]["mode"] == "spdk": 1419 target_obj = SPDKTarget(k, data["general"], v) 1420 elif data[k]["mode"] == "kernel": 1421 target_obj = KernelTarget(k, data["general"], v) 1422 pass 1423 elif "initiator" in k: 1424 if data[k]["mode"] == "spdk": 1425 init_obj = SPDKInitiator(k, data["general"], v) 1426 elif data[k]["mode"] == "kernel": 1427 init_obj = KernelInitiator(k, data["general"], v) 1428 initiators.append(init_obj) 1429 elif "fio" in k: 1430 fio_workloads = itertools.product(data[k]["bs"], 1431 data[k]["qd"], 1432 data[k]["rw"]) 1433 1434 fio_run_time = data[k]["run_time"] 1435 fio_ramp_time = data[k]["ramp_time"] 1436 fio_rw_mix_read = data[k]["rwmixread"] 1437 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1438 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1439 1440 fio_rate_iops = 0 1441 if "rate_iops" in data[k]: 1442 fio_rate_iops = data[k]["rate_iops"] 1443 else: 1444 continue 1445 1446 try: 1447 os.mkdir(args.results) 1448 except FileExistsError: 1449 pass 1450 1451 target_obj.tgt_start() 1452 1453 for i in initiators: 1454 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1455 if i.enable_adq: 1456 i.adq_configure_tc() 1457 1458 # Poor mans threading 1459 # Run FIO tests 1460 for block_size, io_depth, rw in fio_workloads: 1461 threads = [] 1462 configs = [] 1463 for i in initiators: 1464 if i.mode == "kernel": 1465 i.kernel_init_connect() 1466 1467 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1468 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1469 configs.append(cfg) 1470 1471 for i, cfg in zip(initiators, configs): 1472 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1473 threads.append(t) 1474 if target_obj.enable_sar: 1475 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1476 sar_file_name = ".".join([sar_file_name, "txt"]) 1477 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1478 threads.append(t) 1479 1480 if target_obj.enable_pcm: 1481 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1482 1483 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1484 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1485 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1486 1487 threads.append(pcm_cpu_t) 1488 threads.append(pcm_mem_t) 1489 threads.append(pcm_pow_t) 1490 1491 if target_obj.enable_bandwidth: 1492 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1493 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1494 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1495 threads.append(t) 1496 1497 if target_obj.enable_dpdk_memory: 1498 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1499 threads.append(t) 1500 1501 for t in threads: 1502 t.start() 1503 for t in threads: 1504 t.join() 1505 1506 for i in initiators: 1507 if i.mode == "kernel": 1508 i.kernel_init_disconnect() 1509 i.copy_result_files(args.results) 1510 1511 target_obj.restore_governor() 1512 target_obj.restore_tuned() 1513 target_obj.restore_services() 1514 target_obj.restore_sysctl() 1515 for i in initiators: 1516 i.restore_governor() 1517 i.restore_tuned() 1518 i.restore_services() 1519 i.restore_sysctl() 1520 target_obj.parse_results(args.results, args.csv_filename) 1521