1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import zipfile 8import threading 9import subprocess 10import itertools 11import configparser 12import time 13import uuid 14from collections import OrderedDict 15 16import paramiko 17import pandas as pd 18 19import rpc 20import rpc.client 21from common import * 22 23 24class Server: 25 def __init__(self, name, general_config, server_config): 26 self.name = name 27 self.username = general_config["username"] 28 self.password = general_config["password"] 29 self.transport = general_config["transport"].lower() 30 self.nic_ips = server_config["nic_ips"] 31 self.mode = server_config["mode"] 32 33 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 34 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 35 self.irq_scripts_dir = server_config["irq_scripts_dir"] 36 37 self.local_nic_info = [] 38 self._nics_json_obj = {} 39 self.svc_restore_dict = {} 40 self.sysctl_restore_dict = {} 41 self.tuned_restore_dict = {} 42 self.governor_restore = "" 43 self.tuned_profile = "" 44 45 self.enable_adq = False 46 self.adq_priority = None 47 if "adq_enable" in server_config and server_config["adq_enable"]: 48 self.enable_adq = server_config["adq_enable"] 49 self.adq_priority = 1 50 51 if "tuned_profile" in server_config: 52 self.tuned_profile = server_config["tuned_profile"] 53 54 if not re.match("^[A-Za-z0-9]*$", name): 55 self.log_print("Please use a name which contains only letters or numbers") 56 sys.exit(1) 57 58 def log_print(self, msg): 59 print("[%s] %s" % (self.name, msg), flush=True) 60 61 def get_uncommented_lines(self, lines): 62 return [line for line in lines if line and not line.startswith('#')] 63 64 def get_nic_name_by_ip(self, ip): 65 if not self._nics_json_obj: 66 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 67 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 68 for nic in self._nics_json_obj: 69 for addr in nic["addr_info"]: 70 if ip in addr["local"]: 71 return nic["ifname"] 72 73 def set_local_nic_info_helper(self): 74 pass 75 76 def set_local_nic_info(self, pci_info): 77 def extract_network_elements(json_obj): 78 nic_list = [] 79 if isinstance(json_obj, list): 80 for x in json_obj: 81 nic_list.extend(extract_network_elements(x)) 82 elif isinstance(json_obj, dict): 83 if "children" in json_obj: 84 nic_list.extend(extract_network_elements(json_obj["children"])) 85 if "class" in json_obj.keys() and "network" in json_obj["class"]: 86 nic_list.append(json_obj) 87 return nic_list 88 89 self.local_nic_info = extract_network_elements(pci_info) 90 91 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 92 return "" 93 94 def configure_system(self): 95 self.configure_services() 96 self.configure_sysctl() 97 self.configure_tuned() 98 self.configure_cpu_governor() 99 self.configure_irq_affinity() 100 101 def configure_adq(self): 102 if self.mode == "kernel": 103 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 104 return 105 self.adq_load_modules() 106 self.adq_configure_nic() 107 108 def adq_load_modules(self): 109 self.log_print("Modprobing ADQ-related Linux modules...") 110 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 111 for module in adq_module_deps: 112 try: 113 self.exec_cmd(["sudo", "modprobe", module]) 114 self.log_print("%s loaded!" % module) 115 except CalledProcessError as e: 116 self.log_print("ERROR: failed to load module %s" % module) 117 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 118 119 def adq_configure_tc(self): 120 self.log_print("Configuring ADQ Traffic classess and filters...") 121 122 if self.mode == "kernel": 123 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 124 return 125 126 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 127 num_queues_tc1 = self.num_cores 128 port_param = "dst_port" if isinstance(self, Target) else "src_port" 129 ports = set([p[0] for p in self.subsystem_info_list]) 130 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 131 132 for nic_ip in self.nic_ips: 133 nic_name = self.get_nic_name_by_ip(nic_ip) 134 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 135 "root", "mqprio", "num_tc", "2", "map", "0", "1", 136 "queues", "%s@0" % num_queues_tc0, 137 "%s@%s" % (num_queues_tc1, num_queues_tc0), 138 "hw", "1", "mode", "channel"] 139 self.log_print(" ".join(tc_qdisc_map_cmd)) 140 self.exec_cmd(tc_qdisc_map_cmd) 141 142 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 143 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 144 self.exec_cmd(tc_qdisc_ingress_cmd) 145 146 for port in ports: 147 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 148 "protocol", "ip", "ingress", "prio", "1", "flower", 149 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 150 "skip_sw", "hw_tc", "1"] 151 self.log_print(" ".join(tc_filter_cmd)) 152 self.exec_cmd(tc_filter_cmd) 153 154 # Ethtool coalese settings must be applied after configuring traffic classes 155 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 156 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 157 158 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 159 xps_cmd = ["sudo", xps_script_path, nic_name] 160 self.log_print(xps_cmd) 161 self.exec_cmd(xps_cmd) 162 163 def adq_configure_nic(self): 164 self.log_print("Configuring NIC port settings for ADQ testing...") 165 166 # Reload the driver first, to make sure any previous settings are re-set. 167 try: 168 self.exec_cmd(["sudo", "rmmod", "ice"]) 169 self.exec_cmd(["sudo", "modprobe", "ice"]) 170 except CalledProcessError as e: 171 self.log_print("ERROR: failed to reload ice module!") 172 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 173 174 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 175 for nic in nic_names: 176 self.log_print(nic) 177 try: 178 self.exec_cmd(["sudo", "ethtool", "-K", nic, 179 "hw-tc-offload", "on"]) # Enable hardware TC offload 180 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 181 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 182 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 183 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 184 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 185 except CalledProcessError as e: 186 self.log_print("ERROR: failed to configure NIC port using ethtool!") 187 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 188 self.log_print("Please update your NIC driver and firmware versions and try again.") 189 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 190 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 191 192 def configure_services(self): 193 self.log_print("Configuring active services...") 194 svc_config = configparser.ConfigParser(strict=False) 195 196 # Below list is valid only for RHEL / Fedora systems and might not 197 # contain valid names for other distributions. 198 svc_target_state = { 199 "firewalld": "inactive", 200 "irqbalance": "inactive", 201 "lldpad.service": "inactive", 202 "lldpad.socket": "inactive" 203 } 204 205 for service in svc_target_state: 206 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 207 out = "\n".join(["[%s]" % service, out]) 208 svc_config.read_string(out) 209 210 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 211 continue 212 213 service_state = svc_config[service]["ActiveState"] 214 self.log_print("Current state of %s service is %s" % (service, service_state)) 215 self.svc_restore_dict.update({service: service_state}) 216 if service_state != "inactive": 217 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 218 self.exec_cmd(["sudo", "systemctl", "stop", service]) 219 220 def configure_sysctl(self): 221 self.log_print("Tuning sysctl settings...") 222 223 busy_read = 0 224 if self.enable_adq and self.mode == "spdk": 225 busy_read = 1 226 227 sysctl_opts = { 228 "net.core.busy_poll": 0, 229 "net.core.busy_read": busy_read, 230 "net.core.somaxconn": 4096, 231 "net.core.netdev_max_backlog": 8192, 232 "net.ipv4.tcp_max_syn_backlog": 16384, 233 "net.core.rmem_max": 268435456, 234 "net.core.wmem_max": 268435456, 235 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 236 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 237 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 238 "net.ipv4.route.flush": 1, 239 "vm.overcommit_memory": 1, 240 } 241 242 for opt, value in sysctl_opts.items(): 243 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 244 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 245 246 def configure_tuned(self): 247 if not self.tuned_profile: 248 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 249 return 250 251 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 252 service = "tuned" 253 tuned_config = configparser.ConfigParser(strict=False) 254 255 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 256 out = "\n".join(["[%s]" % service, out]) 257 tuned_config.read_string(out) 258 tuned_state = tuned_config[service]["ActiveState"] 259 self.svc_restore_dict.update({service: tuned_state}) 260 261 if tuned_state != "inactive": 262 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 263 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 264 265 self.tuned_restore_dict = { 266 "profile": profile, 267 "mode": profile_mode 268 } 269 270 self.exec_cmd(["sudo", "systemctl", "start", service]) 271 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 272 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 273 274 def configure_cpu_governor(self): 275 self.log_print("Setting CPU governor to performance...") 276 277 # This assumes that there is the same CPU scaling governor on each CPU 278 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 279 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 280 281 def configure_irq_affinity(self): 282 self.log_print("Setting NIC irq affinity for NICs...") 283 284 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 285 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 286 for nic in nic_names: 287 irq_cmd = ["sudo", irq_script_path, nic] 288 self.log_print(irq_cmd) 289 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 290 291 def restore_services(self): 292 self.log_print("Restoring services...") 293 for service, state in self.svc_restore_dict.items(): 294 cmd = "stop" if state == "inactive" else "start" 295 self.exec_cmd(["sudo", "systemctl", cmd, service]) 296 297 def restore_sysctl(self): 298 self.log_print("Restoring sysctl settings...") 299 for opt, value in self.sysctl_restore_dict.items(): 300 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 301 302 def restore_tuned(self): 303 self.log_print("Restoring tuned-adm settings...") 304 305 if not self.tuned_restore_dict: 306 return 307 308 if self.tuned_restore_dict["mode"] == "auto": 309 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 310 self.log_print("Reverted tuned-adm to auto_profile.") 311 else: 312 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 313 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 314 315 def restore_governor(self): 316 self.log_print("Restoring CPU governor setting...") 317 if self.governor_restore: 318 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 319 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 320 321 322class Target(Server): 323 def __init__(self, name, general_config, target_config): 324 super(Target, self).__init__(name, general_config, target_config) 325 326 # Defaults 327 self.enable_sar = False 328 self.sar_delay = 0 329 self.sar_interval = 0 330 self.sar_count = 0 331 self.enable_pcm = False 332 self.pcm_dir = "" 333 self.pcm_delay = 0 334 self.pcm_interval = 0 335 self.pcm_count = 0 336 self.enable_bandwidth = 0 337 self.bandwidth_count = 0 338 self.enable_dpdk_memory = False 339 self.dpdk_wait_time = 0 340 self.enable_zcopy = False 341 self.scheduler_name = "static" 342 self.null_block = 0 343 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 344 self.subsystem_info_list = [] 345 346 if "null_block_devices" in target_config: 347 self.null_block = target_config["null_block_devices"] 348 if "sar_settings" in target_config: 349 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 350 if "pcm_settings" in target_config: 351 self.enable_pcm = True 352 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 353 if "enable_bandwidth" in target_config: 354 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 355 if "enable_dpdk_memory" in target_config: 356 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 357 if "scheduler_settings" in target_config: 358 self.scheduler_name = target_config["scheduler_settings"] 359 if "zcopy_settings" in target_config: 360 self.enable_zcopy = target_config["zcopy_settings"] 361 362 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 363 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 364 self.set_local_nic_info(self.set_local_nic_info_helper()) 365 366 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 367 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 368 369 self.configure_system() 370 if self.enable_adq: 371 self.configure_adq() 372 self.sys_config() 373 374 def set_local_nic_info_helper(self): 375 return json.loads(self.exec_cmd(["lshw", "-json"])) 376 377 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 378 stderr_opt = None 379 if stderr_redirect: 380 stderr_opt = subprocess.STDOUT 381 if change_dir: 382 old_cwd = os.getcwd() 383 os.chdir(change_dir) 384 self.log_print("Changing directory to %s" % change_dir) 385 386 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 387 388 if change_dir: 389 os.chdir(old_cwd) 390 self.log_print("Changing directory to %s" % old_cwd) 391 return out 392 393 def zip_spdk_sources(self, spdk_dir, dest_file): 394 self.log_print("Zipping SPDK source directory") 395 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 396 for root, directories, files in os.walk(spdk_dir, followlinks=True): 397 for file in files: 398 fh.write(os.path.relpath(os.path.join(root, file))) 399 fh.close() 400 self.log_print("Done zipping") 401 402 def read_json_stats(self, file): 403 with open(file, "r") as json_data: 404 data = json.load(json_data) 405 job_pos = 0 # job_post = 0 because using aggregated results 406 407 # Check if latency is in nano or microseconds to choose correct dict key 408 def get_lat_unit(key_prefix, dict_section): 409 # key prefix - lat, clat or slat. 410 # dict section - portion of json containing latency bucket in question 411 # Return dict key to access the bucket and unit as string 412 for k, v in dict_section.items(): 413 if k.startswith(key_prefix): 414 return k, k.split("_")[1] 415 416 def get_clat_percentiles(clat_dict_leaf): 417 if "percentile" in clat_dict_leaf: 418 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 419 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 420 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 421 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 422 423 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 424 else: 425 # Latest fio versions do not provide "percentile" results if no 426 # measurements were done, so just return zeroes 427 return [0, 0, 0, 0] 428 429 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 430 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 431 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 432 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 433 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 434 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 435 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 436 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 437 data["jobs"][job_pos]["read"][clat_key]) 438 439 if "ns" in lat_unit: 440 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 441 if "ns" in clat_unit: 442 read_p99_lat = read_p99_lat / 1000 443 read_p99_9_lat = read_p99_9_lat / 1000 444 read_p99_99_lat = read_p99_99_lat / 1000 445 read_p99_999_lat = read_p99_999_lat / 1000 446 447 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 448 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 449 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 450 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 451 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 452 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 453 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 454 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 455 data["jobs"][job_pos]["write"][clat_key]) 456 457 if "ns" in lat_unit: 458 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 459 if "ns" in clat_unit: 460 write_p99_lat = write_p99_lat / 1000 461 write_p99_9_lat = write_p99_9_lat / 1000 462 write_p99_99_lat = write_p99_99_lat / 1000 463 write_p99_999_lat = write_p99_999_lat / 1000 464 465 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 466 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 467 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 468 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 469 470 def parse_results(self, results_dir, initiator_count=None, run_num=None): 471 files = os.listdir(results_dir) 472 fio_files = filter(lambda x: ".fio" in x, files) 473 json_files = [x for x in files if ".json" in x] 474 475 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 476 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 477 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 478 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 479 480 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 481 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 482 483 header_line = ",".join(["Name", *headers]) 484 aggr_header_line = ",".join(["Name", *aggr_headers]) 485 486 # Create empty results file 487 csv_file = "nvmf_results.csv" 488 with open(os.path.join(results_dir, csv_file), "w") as fh: 489 fh.write(aggr_header_line + "\n") 490 rows = set() 491 492 for fio_config in fio_files: 493 self.log_print("Getting FIO stats for %s" % fio_config) 494 job_name, _ = os.path.splitext(fio_config) 495 496 # Look in the filename for rwmixread value. Function arguments do 497 # not have that information. 498 # TODO: Improve this function by directly using workload params instead 499 # of regexing through filenames. 500 if "read" in job_name: 501 rw_mixread = 1 502 elif "write" in job_name: 503 rw_mixread = 0 504 else: 505 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 506 507 # If "_CPU" exists in name - ignore it 508 # Initiators for the same job could have diffrent num_cores parameter 509 job_name = re.sub(r"_\d+CPU", "", job_name) 510 job_result_files = [x for x in json_files if job_name in x] 511 self.log_print("Matching result files for current fio config:") 512 for j in job_result_files: 513 self.log_print("\t %s" % j) 514 515 # There may have been more than 1 initiator used in test, need to check that 516 # Result files are created so that string after last "_" separator is server name 517 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 518 inits_avg_results = [] 519 for i in inits_names: 520 self.log_print("\tGetting stats for initiator %s" % i) 521 # There may have been more than 1 test run for this job, calculate average results for initiator 522 i_results = [x for x in job_result_files if i in x] 523 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 524 525 separate_stats = [] 526 for r in i_results: 527 stats = self.read_json_stats(os.path.join(results_dir, r)) 528 separate_stats.append(stats) 529 self.log_print(stats) 530 531 init_results = [sum(x) for x in zip(*separate_stats)] 532 init_results = [x / len(separate_stats) for x in init_results] 533 inits_avg_results.append(init_results) 534 535 self.log_print("\tAverage results for initiator %s" % i) 536 self.log_print(init_results) 537 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 538 fh.write(header_line + "\n") 539 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 540 541 # Sum results of all initiators running this FIO job. 542 # Latency results are an average of latencies from accros all initiators. 543 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 544 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 545 for key in inits_avg_results: 546 if "lat" in key: 547 inits_avg_results[key] /= len(inits_names) 548 549 # Aggregate separate read/write values into common labels 550 # Take rw_mixread into consideration for mixed read/write workloads. 551 aggregate_results = OrderedDict() 552 for h in aggr_headers: 553 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 554 if "lat" in h: 555 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 556 else: 557 _ = read_stat + write_stat 558 aggregate_results[h] = "{0:.3f}".format(_) 559 560 rows.add(",".join([job_name, *aggregate_results.values()])) 561 562 # Save results to file 563 for row in rows: 564 with open(os.path.join(results_dir, csv_file), "a") as fh: 565 fh.write(row + "\n") 566 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 567 568 def measure_sar(self, results_dir, sar_file_name): 569 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 570 time.sleep(self.sar_delay) 571 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 572 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 573 for line in out.split("\n"): 574 if "Average" in line and "CPU" in line: 575 self.log_print("Summary CPU utilization from SAR:") 576 self.log_print(line) 577 if "Average" in line and "all" in line: 578 self.log_print(line) 579 fh.write(out) 580 581 def measure_pcm_memory(self, results_dir, pcm_file_name): 582 time.sleep(self.pcm_delay) 583 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 584 pcm_memory = subprocess.Popen(cmd) 585 time.sleep(self.pcm_count) 586 pcm_memory.terminate() 587 588 def measure_pcm(self, results_dir, pcm_file_name): 589 time.sleep(self.pcm_delay) 590 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 591 subprocess.run(cmd) 592 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 593 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 594 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 595 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 596 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 597 598 def measure_pcm_power(self, results_dir, pcm_power_file_name): 599 time.sleep(self.pcm_delay) 600 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 601 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 602 fh.write(out) 603 604 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 605 self.log_print("INFO: starting network bandwidth measure") 606 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 607 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 608 609 def measure_dpdk_memory(self, results_dir): 610 self.log_print("INFO: waiting to generate DPDK memory usage") 611 time.sleep(self.dpdk_wait_time) 612 self.log_print("INFO: generating DPDK memory usage") 613 rpc.env.env_dpdk_get_mem_stats 614 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 615 616 def sys_config(self): 617 self.log_print("====Kernel release:====") 618 self.log_print(os.uname().release) 619 self.log_print("====Kernel command line:====") 620 with open('/proc/cmdline') as f: 621 cmdline = f.readlines() 622 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 623 self.log_print("====sysctl conf:====") 624 with open('/etc/sysctl.conf') as f: 625 sysctl = f.readlines() 626 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 627 self.log_print("====Cpu power info:====") 628 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 629 self.log_print("====zcopy settings:====") 630 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 631 self.log_print("====Scheduler settings:====") 632 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 633 634 635class Initiator(Server): 636 def __init__(self, name, general_config, initiator_config): 637 super(Initiator, self).__init__(name, general_config, initiator_config) 638 639 # Required fields 640 self.ip = initiator_config["ip"] 641 self.target_nic_ips = initiator_config["target_nic_ips"] 642 643 # Defaults 644 self.cpus_allowed = None 645 self.cpus_allowed_policy = "shared" 646 self.spdk_dir = "/tmp/spdk" 647 self.fio_bin = "/usr/src/fio/fio" 648 self.nvmecli_bin = "nvme" 649 self.cpu_frequency = None 650 self.subsystem_info_list = [] 651 652 if "spdk_dir" in initiator_config: 653 self.spdk_dir = initiator_config["spdk_dir"] 654 if "fio_bin" in initiator_config: 655 self.fio_bin = initiator_config["fio_bin"] 656 if "nvmecli_bin" in initiator_config: 657 self.nvmecli_bin = initiator_config["nvmecli_bin"] 658 if "cpus_allowed" in initiator_config: 659 self.cpus_allowed = initiator_config["cpus_allowed"] 660 if "cpus_allowed_policy" in initiator_config: 661 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 662 if "cpu_frequency" in initiator_config: 663 self.cpu_frequency = initiator_config["cpu_frequency"] 664 if os.getenv('SPDK_WORKSPACE'): 665 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 666 667 self.ssh_connection = paramiko.SSHClient() 668 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 669 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 670 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 671 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 672 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 673 674 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 675 self.copy_spdk("/tmp/spdk.zip") 676 self.set_local_nic_info(self.set_local_nic_info_helper()) 677 self.set_cpu_frequency() 678 self.configure_system() 679 if self.enable_adq: 680 self.configure_adq() 681 self.sys_config() 682 683 def set_local_nic_info_helper(self): 684 return json.loads(self.exec_cmd(["lshw", "-json"])) 685 686 def __del__(self): 687 self.ssh_connection.close() 688 689 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 690 if change_dir: 691 cmd = ["cd", change_dir, ";", *cmd] 692 693 # In case one of the command elements contains whitespace and is not 694 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 695 # when sending to remote system. 696 for i, c in enumerate(cmd): 697 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 698 cmd[i] = '"%s"' % c 699 cmd = " ".join(cmd) 700 701 # Redirect stderr to stdout thanks using get_pty option if needed 702 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 703 out = stdout.read().decode(encoding="utf-8") 704 705 # Check the return code 706 rc = stdout.channel.recv_exit_status() 707 if rc: 708 raise CalledProcessError(int(rc), cmd, out) 709 710 return out 711 712 def put_file(self, local, remote_dest): 713 ftp = self.ssh_connection.open_sftp() 714 ftp.put(local, remote_dest) 715 ftp.close() 716 717 def get_file(self, remote, local_dest): 718 ftp = self.ssh_connection.open_sftp() 719 ftp.get(remote, local_dest) 720 ftp.close() 721 722 def copy_spdk(self, local_spdk_zip): 723 self.log_print("Copying SPDK sources to initiator %s" % self.name) 724 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 725 self.log_print("Copied sources zip from target") 726 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 727 self.log_print("Sources unpacked") 728 729 def copy_result_files(self, dest_dir): 730 self.log_print("Copying results") 731 732 if not os.path.exists(dest_dir): 733 os.mkdir(dest_dir) 734 735 # Get list of result files from initiator and copy them back to target 736 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 737 738 for file in file_list: 739 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 740 os.path.join(dest_dir, file)) 741 self.log_print("Done copying results") 742 743 def discover_subsystems(self, address_list, subsys_no): 744 num_nvmes = range(0, subsys_no) 745 nvme_discover_output = "" 746 for ip, subsys_no in itertools.product(address_list, num_nvmes): 747 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 748 nvme_discover_cmd = ["sudo", 749 "%s" % self.nvmecli_bin, 750 "discover", "-t", "%s" % self.transport, 751 "-s", "%s" % (4420 + subsys_no), 752 "-a", "%s" % ip] 753 754 try: 755 stdout = self.exec_cmd(nvme_discover_cmd) 756 if stdout: 757 nvme_discover_output = nvme_discover_output + stdout 758 except CalledProcessError: 759 # Do nothing. In case of discovering remote subsystems of kernel target 760 # we expect "nvme discover" to fail a bunch of times because we basically 761 # scan ports. 762 pass 763 764 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 765 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 766 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 767 nvme_discover_output) # from nvme discovery output 768 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 769 subsystems = list(set(subsystems)) 770 subsystems.sort(key=lambda x: x[1]) 771 self.log_print("Found matching subsystems on target side:") 772 for s in subsystems: 773 self.log_print(s) 774 self.subsystem_info_list = subsystems 775 776 def gen_fio_filename_conf(self, *args, **kwargs): 777 # Logic implemented in SPDKInitiator and KernelInitiator classes 778 pass 779 780 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 781 fio_conf_template = """ 782[global] 783ioengine={ioengine} 784{spdk_conf} 785thread=1 786group_reporting=1 787direct=1 788percentile_list=50:90:99:99.5:99.9:99.99:99.999 789 790norandommap=1 791rw={rw} 792rwmixread={rwmixread} 793bs={block_size} 794time_based=1 795ramp_time={ramp_time} 796runtime={run_time} 797rate_iops={rate_iops} 798""" 799 if "spdk" in self.mode: 800 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 801 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 802 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 803 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 804 else: 805 ioengine = "libaio" 806 spdk_conf = "" 807 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 808 "|", "awk", "'{print $1}'"]) 809 subsystems = [x for x in out.split("\n") if "nvme" in x] 810 811 if self.cpus_allowed is not None: 812 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 813 cpus_num = 0 814 cpus = self.cpus_allowed.split(",") 815 for cpu in cpus: 816 if "-" in cpu: 817 a, b = cpu.split("-") 818 a = int(a) 819 b = int(b) 820 cpus_num += len(range(a, b)) 821 else: 822 cpus_num += 1 823 self.num_cores = cpus_num 824 threads = range(0, self.num_cores) 825 elif hasattr(self, 'num_cores'): 826 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 827 threads = range(0, int(self.num_cores)) 828 else: 829 self.num_cores = len(subsystems) 830 threads = range(0, len(subsystems)) 831 832 if "spdk" in self.mode: 833 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 834 else: 835 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 836 837 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 838 rw=rw, rwmixread=rwmixread, block_size=block_size, 839 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 840 if num_jobs: 841 fio_config = fio_config + "numjobs=%s \n" % num_jobs 842 if self.cpus_allowed is not None: 843 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 844 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 845 fio_config = fio_config + filename_section 846 847 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 848 if hasattr(self, "num_cores"): 849 fio_config_filename += "_%sCPU" % self.num_cores 850 fio_config_filename += ".fio" 851 852 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 853 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 854 self.log_print("Created FIO Config:") 855 self.log_print(fio_config) 856 857 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 858 859 def set_cpu_frequency(self): 860 if self.cpu_frequency is not None: 861 try: 862 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 863 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 864 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 865 except Exception: 866 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 867 sys.exit() 868 else: 869 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 870 871 def run_fio(self, fio_config_file, run_num=None): 872 job_name, _ = os.path.splitext(fio_config_file) 873 self.log_print("Starting FIO run for job: %s" % job_name) 874 self.log_print("Using FIO: %s" % self.fio_bin) 875 876 if run_num: 877 for i in range(1, run_num + 1): 878 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 879 output = self.exec_cmd(["sudo", self.fio_bin, 880 fio_config_file, "--output-format=json", 881 "--output=%s" % output_filename], True) 882 self.log_print(output) 883 else: 884 output_filename = job_name + "_" + self.name + ".json" 885 output = self.exec_cmd(["sudo", self.fio_bin, 886 fio_config_file, "--output-format=json", 887 "--output" % output_filename], True) 888 self.log_print(output) 889 self.log_print("FIO run finished. Results in: %s" % output_filename) 890 891 def sys_config(self): 892 self.log_print("====Kernel release:====") 893 self.log_print(self.exec_cmd(["uname", "-r"])) 894 self.log_print("====Kernel command line:====") 895 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 896 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 897 self.log_print("====sysctl conf:====") 898 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 899 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 900 self.log_print("====Cpu power info:====") 901 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 902 903 904class KernelTarget(Target): 905 def __init__(self, name, general_config, target_config): 906 super(KernelTarget, self).__init__(name, general_config, target_config) 907 # Defaults 908 self.nvmet_bin = "nvmetcli" 909 910 if "nvmet_bin" in target_config: 911 self.nvmet_bin = target_config["nvmet_bin"] 912 913 def __del__(self): 914 nvmet_command(self.nvmet_bin, "clear") 915 916 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 917 918 nvmet_cfg = { 919 "ports": [], 920 "hosts": [], 921 "subsystems": [], 922 } 923 924 # Split disks between NIC IP's 925 disks_per_ip = int(len(nvme_list) / len(address_list)) 926 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 927 928 subsys_no = 1 929 port_no = 0 930 for ip, chunk in zip(address_list, disk_chunks): 931 for disk in chunk: 932 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 933 nvmet_cfg["subsystems"].append({ 934 "allowed_hosts": [], 935 "attr": { 936 "allow_any_host": "1", 937 "serial": "SPDK00%s" % subsys_no, 938 "version": "1.3" 939 }, 940 "namespaces": [ 941 { 942 "device": { 943 "path": disk, 944 "uuid": "%s" % uuid.uuid4() 945 }, 946 "enable": 1, 947 "nsid": subsys_no 948 } 949 ], 950 "nqn": nqn 951 }) 952 953 nvmet_cfg["ports"].append({ 954 "addr": { 955 "adrfam": "ipv4", 956 "traddr": ip, 957 "trsvcid": "%s" % (4420 + port_no), 958 "trtype": "%s" % self.transport 959 }, 960 "portid": subsys_no, 961 "referrals": [], 962 "subsystems": [nqn] 963 }) 964 subsys_no += 1 965 port_no += 1 966 self.subsystem_info_list.append([port_no, nqn, ip]) 967 968 with open("kernel.conf", "w") as fh: 969 fh.write(json.dumps(nvmet_cfg, indent=2)) 970 pass 971 972 def tgt_start(self): 973 self.log_print("Configuring kernel NVMeOF Target") 974 975 if self.null_block: 976 print("Configuring with null block device.") 977 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 978 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 979 self.subsys_no = len(null_blk_list) 980 else: 981 print("Configuring with NVMe drives.") 982 nvme_list = get_nvme_devices() 983 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 984 self.subsys_no = len(nvme_list) 985 986 nvmet_command(self.nvmet_bin, "clear") 987 nvmet_command(self.nvmet_bin, "restore kernel.conf") 988 989 if self.enable_adq: 990 self.adq_configure_tc() 991 992 self.log_print("Done configuring kernel NVMeOF Target") 993 994 995class SPDKTarget(Target): 996 def __init__(self, name, general_config, target_config): 997 super(SPDKTarget, self).__init__(name, general_config, target_config) 998 999 # Required fields 1000 self.core_mask = target_config["core_mask"] 1001 self.num_cores = self.get_num_cores(self.core_mask) 1002 1003 # Defaults 1004 self.dif_insert_strip = False 1005 self.null_block_dif_type = 0 1006 self.num_shared_buffers = 4096 1007 1008 if "num_shared_buffers" in target_config: 1009 self.num_shared_buffers = target_config["num_shared_buffers"] 1010 if "null_block_dif_type" in target_config: 1011 self.null_block_dif_type = target_config["null_block_dif_type"] 1012 if "dif_insert_strip" in target_config: 1013 self.dif_insert_strip = target_config["dif_insert_strip"] 1014 1015 def get_num_cores(self, core_mask): 1016 if "0x" in core_mask: 1017 return bin(int(core_mask, 16)).count("1") 1018 else: 1019 num_cores = 0 1020 core_mask = core_mask.replace("[", "") 1021 core_mask = core_mask.replace("]", "") 1022 for i in core_mask.split(","): 1023 if "-" in i: 1024 x, y = i.split("-") 1025 num_cores += len(range(int(x), int(y))) + 1 1026 else: 1027 num_cores += 1 1028 return num_cores 1029 1030 def spdk_tgt_configure(self): 1031 self.log_print("Configuring SPDK NVMeOF target via RPC") 1032 numa_list = get_used_numa_nodes() 1033 1034 # Create RDMA transport layer 1035 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1036 num_shared_buffers=self.num_shared_buffers, 1037 dif_insert_or_strip=self.dif_insert_strip, 1038 sock_priority=self.adq_priority) 1039 self.log_print("SPDK NVMeOF transport layer:") 1040 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1041 1042 if self.null_block: 1043 self.spdk_tgt_add_nullblock(self.null_block) 1044 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1045 else: 1046 self.spdk_tgt_add_nvme_conf() 1047 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1048 1049 if self.enable_adq: 1050 self.adq_configure_tc() 1051 self.log_print("Done configuring SPDK NVMeOF Target") 1052 1053 def spdk_tgt_add_nullblock(self, null_block_count): 1054 md_size = 0 1055 block_size = 4096 1056 if self.null_block_dif_type != 0: 1057 md_size = 128 1058 1059 self.log_print("Adding null block bdevices to config via RPC") 1060 for i in range(null_block_count): 1061 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1062 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1063 dif_type=self.null_block_dif_type, md_size=md_size) 1064 self.log_print("SPDK Bdevs configuration:") 1065 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1066 1067 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1068 self.log_print("Adding NVMe bdevs to config via RPC") 1069 1070 bdfs = get_nvme_devices_bdf() 1071 bdfs = [b.replace(":", ".") for b in bdfs] 1072 1073 if req_num_disks: 1074 if req_num_disks > len(bdfs): 1075 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1076 sys.exit(1) 1077 else: 1078 bdfs = bdfs[0:req_num_disks] 1079 1080 for i, bdf in enumerate(bdfs): 1081 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1082 1083 self.log_print("SPDK Bdevs configuration:") 1084 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1085 1086 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1087 self.log_print("Adding subsystems to config") 1088 port = "4420" 1089 if not req_num_disks: 1090 req_num_disks = get_nvme_devices_count() 1091 1092 # Distribute bdevs between provided NICs 1093 num_disks = range(0, req_num_disks) 1094 if len(num_disks) == 1: 1095 disks_per_ip = 1 1096 else: 1097 disks_per_ip = int(len(num_disks) / len(ips)) 1098 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1099 1100 # Create subsystems, add bdevs to namespaces, add listeners 1101 for ip, chunk in zip(ips, disk_chunks): 1102 for c in chunk: 1103 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1104 serial = "SPDK00%s" % c 1105 bdev_name = "Nvme%sn1" % c 1106 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1107 allow_any_host=True, max_namespaces=8) 1108 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1109 1110 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1111 nqn=nqn, 1112 trtype=self.transport, 1113 traddr=ip, 1114 trsvcid=port, 1115 adrfam="ipv4") 1116 1117 self.subsystem_info_list.append([port, nqn, ip]) 1118 self.log_print("SPDK NVMeOF subsystem configuration:") 1119 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1120 1121 def tgt_start(self): 1122 if self.null_block: 1123 self.subsys_no = 1 1124 else: 1125 self.subsys_no = get_nvme_devices_count() 1126 self.log_print("Starting SPDK NVMeOF Target process") 1127 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1128 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1129 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1130 1131 with open(self.pid, "w") as fh: 1132 fh.write(str(proc.pid)) 1133 self.nvmf_proc = proc 1134 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1135 self.log_print("Waiting for spdk to initilize...") 1136 while True: 1137 if os.path.exists("/var/tmp/spdk.sock"): 1138 break 1139 time.sleep(1) 1140 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1141 1142 if self.enable_zcopy: 1143 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1144 enable_zerocopy_send=True) 1145 self.log_print("Target socket options:") 1146 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1147 1148 if self.enable_adq: 1149 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1150 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1151 nvme_adminq_poll_period_us=100000, retry_count=4) 1152 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1153 1154 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1155 1156 rpc.framework_start_init(self.client) 1157 self.spdk_tgt_configure() 1158 1159 def __del__(self): 1160 if hasattr(self, "nvmf_proc"): 1161 try: 1162 self.nvmf_proc.terminate() 1163 self.nvmf_proc.wait() 1164 except Exception as e: 1165 self.log_print(e) 1166 self.nvmf_proc.kill() 1167 self.nvmf_proc.communicate() 1168 1169 1170class KernelInitiator(Initiator): 1171 def __init__(self, name, general_config, initiator_config): 1172 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1173 1174 # Defaults 1175 self.extra_params = "" 1176 1177 if "extra_params" in initiator_config: 1178 self.extra_params = initiator_config["extra_params"] 1179 1180 def __del__(self): 1181 self.ssh_connection.close() 1182 1183 def kernel_init_connect(self, address_list, subsys_no): 1184 self.log_print("Below connection attempts may result in error messages, this is expected!") 1185 for subsystem in self.subsystem_info_list: 1186 self.log_print("Trying to connect %s %s %s" % subsystem) 1187 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1188 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1189 time.sleep(2) 1190 1191 def kernel_init_disconnect(self, address_list, subsys_no): 1192 for subsystem in self.subsystem_info_list: 1193 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1194 time.sleep(1) 1195 1196 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1197 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 1198 "|", "awk", "'{print $1}'"]) 1199 nvme_list = [x for x in out.split("\n") if "nvme" in x] 1200 1201 filename_section = "" 1202 nvme_per_split = int(len(nvme_list) / len(threads)) 1203 remainder = len(nvme_list) % len(threads) 1204 iterator = iter(nvme_list) 1205 result = [] 1206 for i in range(len(threads)): 1207 result.append([]) 1208 for j in range(nvme_per_split): 1209 result[i].append(next(iterator)) 1210 if remainder: 1211 result[i].append(next(iterator)) 1212 remainder -= 1 1213 for i, r in enumerate(result): 1214 header = "[filename%s]" % i 1215 disks = "\n".join(["filename=%s" % x for x in r]) 1216 job_section_qd = round((io_depth * len(r)) / num_jobs) 1217 if job_section_qd == 0: 1218 job_section_qd = 1 1219 iodepth = "iodepth=%s" % job_section_qd 1220 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1221 1222 return filename_section 1223 1224 1225class SPDKInitiator(Initiator): 1226 def __init__(self, name, general_config, initiator_config): 1227 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1228 1229 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1230 self.install_spdk(self.spdk_dir) 1231 1232 # Required fields 1233 self.num_cores = initiator_config["num_cores"] 1234 1235 def install_spdk(self, local_spdk_zip): 1236 self.log_print("Using fio binary %s" % self.fio_bin) 1237 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1238 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1239 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1240 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1241 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1242 1243 self.log_print("SPDK built") 1244 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1245 1246 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1247 bdev_cfg_section = { 1248 "subsystems": [ 1249 { 1250 "subsystem": "bdev", 1251 "config": [] 1252 } 1253 ] 1254 } 1255 1256 for i, subsys in enumerate(remote_subsystem_list): 1257 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1258 nvme_ctrl = { 1259 "method": "bdev_nvme_attach_controller", 1260 "params": { 1261 "name": "Nvme{}".format(i), 1262 "trtype": self.transport, 1263 "traddr": sub_addr, 1264 "trsvcid": sub_port, 1265 "subnqn": sub_nqn, 1266 "adrfam": "IPv4" 1267 } 1268 } 1269 1270 if self.enable_adq: 1271 nvme_ctrl["params"].update({"priority": "1"}) 1272 1273 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1274 1275 return json.dumps(bdev_cfg_section, indent=2) 1276 1277 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1278 filename_section = "" 1279 if len(threads) >= len(subsystems): 1280 threads = range(0, len(subsystems)) 1281 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1282 nvme_per_split = int(len(subsystems) / len(threads)) 1283 remainder = len(subsystems) % len(threads) 1284 iterator = iter(filenames) 1285 result = [] 1286 for i in range(len(threads)): 1287 result.append([]) 1288 for j in range(nvme_per_split): 1289 result[i].append(next(iterator)) 1290 if remainder: 1291 result[i].append(next(iterator)) 1292 remainder -= 1 1293 for i, r in enumerate(result): 1294 header = "[filename%s]" % i 1295 disks = "\n".join(["filename=%s" % x for x in r]) 1296 job_section_qd = round((io_depth * len(r)) / num_jobs) 1297 if job_section_qd == 0: 1298 job_section_qd = 1 1299 iodepth = "iodepth=%s" % job_section_qd 1300 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1301 1302 return filename_section 1303 1304 1305if __name__ == "__main__": 1306 spdk_zip_path = "/tmp/spdk.zip" 1307 target_results_dir = "/tmp/results" 1308 1309 if (len(sys.argv) > 1): 1310 config_file_path = sys.argv[1] 1311 else: 1312 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1313 config_file_path = os.path.join(script_full_dir, "config.json") 1314 1315 print("Using config file: %s" % config_file_path) 1316 with open(config_file_path, "r") as config: 1317 data = json.load(config) 1318 1319 initiators = [] 1320 fio_cases = [] 1321 1322 general_config = data["general"] 1323 target_config = data["target"] 1324 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1325 1326 for k, v in data.items(): 1327 if "target" in k: 1328 if data[k]["mode"] == "spdk": 1329 target_obj = SPDKTarget(k, data["general"], v) 1330 elif data[k]["mode"] == "kernel": 1331 target_obj = KernelTarget(k, data["general"], v) 1332 pass 1333 elif "initiator" in k: 1334 if data[k]["mode"] == "spdk": 1335 init_obj = SPDKInitiator(k, data["general"], v) 1336 elif data[k]["mode"] == "kernel": 1337 init_obj = KernelInitiator(k, data["general"], v) 1338 initiators.append(init_obj) 1339 elif "fio" in k: 1340 fio_workloads = itertools.product(data[k]["bs"], 1341 data[k]["qd"], 1342 data[k]["rw"]) 1343 1344 fio_run_time = data[k]["run_time"] 1345 fio_ramp_time = data[k]["ramp_time"] 1346 fio_rw_mix_read = data[k]["rwmixread"] 1347 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1348 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1349 1350 fio_rate_iops = 0 1351 if "rate_iops" in data[k]: 1352 fio_rate_iops = data[k]["rate_iops"] 1353 else: 1354 continue 1355 1356 target_obj.tgt_start() 1357 1358 try: 1359 os.mkdir(target_results_dir) 1360 except FileExistsError: 1361 pass 1362 1363 for i in initiators: 1364 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1365 if i.enable_adq: 1366 i.adq_configure_tc() 1367 1368 # Poor mans threading 1369 # Run FIO tests 1370 for block_size, io_depth, rw in fio_workloads: 1371 threads = [] 1372 configs = [] 1373 for i in initiators: 1374 if i.mode == "kernel": 1375 i.kernel_init_connect(i.target_nic_ips, target_obj.subsys_no) 1376 1377 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1378 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1379 configs.append(cfg) 1380 1381 for i, cfg in zip(initiators, configs): 1382 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1383 threads.append(t) 1384 if target_obj.enable_sar: 1385 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1386 sar_file_name = ".".join([sar_file_name, "txt"]) 1387 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 1388 threads.append(t) 1389 1390 if target_obj.enable_pcm: 1391 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1392 1393 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_fnames[0],)) 1394 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_fnames[1],)) 1395 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_fnames[2],)) 1396 1397 threads.append(pcm_cpu_t) 1398 threads.append(pcm_mem_t) 1399 threads.append(pcm_pow_t) 1400 1401 if target_obj.enable_bandwidth: 1402 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1403 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1404 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 1405 threads.append(t) 1406 1407 if target_obj.enable_dpdk_memory: 1408 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 1409 threads.append(t) 1410 1411 for t in threads: 1412 t.start() 1413 for t in threads: 1414 t.join() 1415 1416 for i in initiators: 1417 if i.mode == "kernel": 1418 i.kernel_init_disconnect(i.target_nic_ips, target_obj.subsys_no) 1419 i.copy_result_files(target_results_dir) 1420 1421 target_obj.restore_governor() 1422 target_obj.restore_tuned() 1423 target_obj.restore_services() 1424 target_obj.restore_sysctl() 1425 for i in initiators: 1426 i.restore_governor() 1427 i.restore_tuned() 1428 i.restore_services() 1429 i.restore_sysctl() 1430 target_obj.parse_results(target_results_dir) 1431