1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import zipfile 8import threading 9import subprocess 10import itertools 11import configparser 12import time 13import uuid 14from collections import OrderedDict 15 16import paramiko 17import pandas as pd 18 19import rpc 20import rpc.client 21from common import * 22 23 24class Server: 25 def __init__(self, name, general_config, server_config): 26 self.name = name 27 self.username = general_config["username"] 28 self.password = general_config["password"] 29 self.transport = general_config["transport"].lower() 30 self.nic_ips = server_config["nic_ips"] 31 self.mode = server_config["mode"] 32 33 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 34 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 35 self.irq_scripts_dir = server_config["irq_scripts_dir"] 36 37 self.local_nic_info = [] 38 self._nics_json_obj = {} 39 self.svc_restore_dict = {} 40 self.sysctl_restore_dict = {} 41 self.tuned_restore_dict = {} 42 self.governor_restore = "" 43 self.tuned_profile = "" 44 45 self.enable_adq = False 46 self.adq_priority = None 47 if "adq_enable" in server_config and server_config["adq_enable"]: 48 self.enable_adq = server_config["adq_enable"] 49 self.adq_priority = 1 50 51 if "tuned_profile" in server_config: 52 self.tuned_profile = server_config["tuned_profile"] 53 54 if not re.match("^[A-Za-z0-9]*$", name): 55 self.log_print("Please use a name which contains only letters or numbers") 56 sys.exit(1) 57 58 def log_print(self, msg): 59 print("[%s] %s" % (self.name, msg), flush=True) 60 61 def get_uncommented_lines(self, lines): 62 return [line for line in lines if line and not line.startswith('#')] 63 64 def get_nic_name_by_ip(self, ip): 65 if not self._nics_json_obj: 66 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 67 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 68 for nic in self._nics_json_obj: 69 for addr in nic["addr_info"]: 70 if ip in addr["local"]: 71 return nic["ifname"] 72 73 def set_local_nic_info_helper(self): 74 pass 75 76 def set_local_nic_info(self, pci_info): 77 def extract_network_elements(json_obj): 78 nic_list = [] 79 if isinstance(json_obj, list): 80 for x in json_obj: 81 nic_list.extend(extract_network_elements(x)) 82 elif isinstance(json_obj, dict): 83 if "children" in json_obj: 84 nic_list.extend(extract_network_elements(json_obj["children"])) 85 if "class" in json_obj.keys() and "network" in json_obj["class"]: 86 nic_list.append(json_obj) 87 return nic_list 88 89 self.local_nic_info = extract_network_elements(pci_info) 90 91 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 92 return "" 93 94 def configure_system(self): 95 self.configure_services() 96 self.configure_sysctl() 97 self.configure_tuned() 98 self.configure_cpu_governor() 99 self.configure_irq_affinity() 100 101 def configure_adq(self): 102 if self.mode == "kernel": 103 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 104 return 105 self.adq_load_modules() 106 self.adq_configure_nic() 107 108 def adq_load_modules(self): 109 self.log_print("Modprobing ADQ-related Linux modules...") 110 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 111 for module in adq_module_deps: 112 try: 113 self.exec_cmd(["sudo", "modprobe", module]) 114 self.log_print("%s loaded!" % module) 115 except CalledProcessError as e: 116 self.log_print("ERROR: failed to load module %s" % module) 117 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 118 119 def adq_configure_tc(self): 120 self.log_print("Configuring ADQ Traffic classess and filters...") 121 122 if self.mode == "kernel": 123 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 124 return 125 126 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 127 num_queues_tc1 = self.num_cores 128 port_param = "dst_port" if isinstance(self, Target) else "src_port" 129 ports = set([p[0] for p in self.subsystem_info_list]) 130 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 131 132 for nic_ip in self.nic_ips: 133 nic_name = self.get_nic_name_by_ip(nic_ip) 134 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 135 "root", "mqprio", "num_tc", "2", "map", "0", "1", 136 "queues", "%s@0" % num_queues_tc0, 137 "%s@%s" % (num_queues_tc1, num_queues_tc0), 138 "hw", "1", "mode", "channel"] 139 self.log_print(" ".join(tc_qdisc_map_cmd)) 140 self.exec_cmd(tc_qdisc_map_cmd) 141 142 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 143 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 144 self.exec_cmd(tc_qdisc_ingress_cmd) 145 146 for port in ports: 147 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 148 "protocol", "ip", "ingress", "prio", "1", "flower", 149 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 150 "skip_sw", "hw_tc", "1"] 151 self.log_print(" ".join(tc_filter_cmd)) 152 self.exec_cmd(tc_filter_cmd) 153 154 # Ethtool coalese settings must be applied after configuring traffic classes 155 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 156 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 157 158 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 159 xps_cmd = ["sudo", xps_script_path, nic_name] 160 self.log_print(xps_cmd) 161 self.exec_cmd(xps_cmd) 162 163 def adq_configure_nic(self): 164 self.log_print("Configuring NIC port settings for ADQ testing...") 165 166 # Reload the driver first, to make sure any previous settings are re-set. 167 try: 168 self.exec_cmd(["sudo", "rmmod", "ice"]) 169 self.exec_cmd(["sudo", "modprobe", "ice"]) 170 except CalledProcessError as e: 171 self.log_print("ERROR: failed to reload ice module!") 172 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 173 174 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 175 for nic in nic_names: 176 self.log_print(nic) 177 try: 178 self.exec_cmd(["sudo", "ethtool", "-K", nic, 179 "hw-tc-offload", "on"]) # Enable hardware TC offload 180 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 181 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 182 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 183 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 184 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 185 except CalledProcessError as e: 186 self.log_print("ERROR: failed to configure NIC port using ethtool!") 187 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 188 self.log_print("Please update your NIC driver and firmware versions and try again.") 189 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 190 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 191 192 def configure_services(self): 193 self.log_print("Configuring active services...") 194 svc_config = configparser.ConfigParser(strict=False) 195 196 # Below list is valid only for RHEL / Fedora systems and might not 197 # contain valid names for other distributions. 198 svc_target_state = { 199 "firewalld": "inactive", 200 "irqbalance": "inactive", 201 "lldpad.service": "inactive", 202 "lldpad.socket": "inactive" 203 } 204 205 for service in svc_target_state: 206 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 207 out = "\n".join(["[%s]" % service, out]) 208 svc_config.read_string(out) 209 210 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 211 continue 212 213 service_state = svc_config[service]["ActiveState"] 214 self.log_print("Current state of %s service is %s" % (service, service_state)) 215 self.svc_restore_dict.update({service: service_state}) 216 if service_state != "inactive": 217 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 218 self.exec_cmd(["sudo", "systemctl", "stop", service]) 219 220 def configure_sysctl(self): 221 self.log_print("Tuning sysctl settings...") 222 223 busy_read = 0 224 if self.enable_adq and self.mode == "spdk": 225 busy_read = 1 226 227 sysctl_opts = { 228 "net.core.busy_poll": 0, 229 "net.core.busy_read": busy_read, 230 "net.core.somaxconn": 4096, 231 "net.core.netdev_max_backlog": 8192, 232 "net.ipv4.tcp_max_syn_backlog": 16384, 233 "net.core.rmem_max": 268435456, 234 "net.core.wmem_max": 268435456, 235 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 236 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 237 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 238 "net.ipv4.route.flush": 1, 239 "vm.overcommit_memory": 1, 240 } 241 242 for opt, value in sysctl_opts.items(): 243 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 244 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 245 246 def configure_tuned(self): 247 if not self.tuned_profile: 248 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 249 return 250 251 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 252 service = "tuned" 253 tuned_config = configparser.ConfigParser(strict=False) 254 255 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 256 out = "\n".join(["[%s]" % service, out]) 257 tuned_config.read_string(out) 258 tuned_state = tuned_config[service]["ActiveState"] 259 self.svc_restore_dict.update({service: tuned_state}) 260 261 if tuned_state != "inactive": 262 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 263 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 264 265 self.tuned_restore_dict = { 266 "profile": profile, 267 "mode": profile_mode 268 } 269 270 self.exec_cmd(["sudo", "systemctl", "start", service]) 271 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 272 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 273 274 def configure_cpu_governor(self): 275 self.log_print("Setting CPU governor to performance...") 276 277 # This assumes that there is the same CPU scaling governor on each CPU 278 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 279 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 280 281 def configure_irq_affinity(self): 282 self.log_print("Setting NIC irq affinity for NICs...") 283 284 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 285 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 286 for nic in nic_names: 287 irq_cmd = ["sudo", irq_script_path, nic] 288 self.log_print(irq_cmd) 289 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 290 291 def restore_services(self): 292 self.log_print("Restoring services...") 293 for service, state in self.svc_restore_dict.items(): 294 cmd = "stop" if state == "inactive" else "start" 295 self.exec_cmd(["sudo", "systemctl", cmd, service]) 296 297 def restore_sysctl(self): 298 self.log_print("Restoring sysctl settings...") 299 for opt, value in self.sysctl_restore_dict.items(): 300 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 301 302 def restore_tuned(self): 303 self.log_print("Restoring tuned-adm settings...") 304 305 if not self.tuned_restore_dict: 306 return 307 308 if self.tuned_restore_dict["mode"] == "auto": 309 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 310 self.log_print("Reverted tuned-adm to auto_profile.") 311 else: 312 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 313 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 314 315 def restore_governor(self): 316 self.log_print("Restoring CPU governor setting...") 317 if self.governor_restore: 318 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 319 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 320 321 322class Target(Server): 323 def __init__(self, name, general_config, target_config): 324 super(Target, self).__init__(name, general_config, target_config) 325 326 # Defaults 327 self.enable_sar = False 328 self.sar_delay = 0 329 self.sar_interval = 0 330 self.sar_count = 0 331 self.enable_pcm = False 332 self.pcm_dir = "" 333 self.pcm_delay = 0 334 self.pcm_interval = 0 335 self.pcm_count = 0 336 self.enable_bandwidth = 0 337 self.bandwidth_count = 0 338 self.enable_dpdk_memory = False 339 self.dpdk_wait_time = 0 340 self.enable_zcopy = False 341 self.scheduler_name = "static" 342 self.null_block = 0 343 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 344 self.subsystem_info_list = [] 345 346 if "null_block_devices" in target_config: 347 self.null_block = target_config["null_block_devices"] 348 if "sar_settings" in target_config: 349 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 350 if "pcm_settings" in target_config: 351 self.enable_pcm = True 352 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 353 if "enable_bandwidth" in target_config: 354 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 355 if "enable_dpdk_memory" in target_config: 356 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 357 if "scheduler_settings" in target_config: 358 self.scheduler_name = target_config["scheduler_settings"] 359 if "zcopy_settings" in target_config: 360 self.enable_zcopy = target_config["zcopy_settings"] 361 362 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 363 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 364 self.set_local_nic_info(self.set_local_nic_info_helper()) 365 366 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 367 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 368 369 self.configure_system() 370 if self.enable_adq: 371 self.configure_adq() 372 self.sys_config() 373 374 def set_local_nic_info_helper(self): 375 return json.loads(self.exec_cmd(["lshw", "-json"])) 376 377 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 378 stderr_opt = None 379 if stderr_redirect: 380 stderr_opt = subprocess.STDOUT 381 if change_dir: 382 old_cwd = os.getcwd() 383 os.chdir(change_dir) 384 self.log_print("Changing directory to %s" % change_dir) 385 386 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 387 388 if change_dir: 389 os.chdir(old_cwd) 390 self.log_print("Changing directory to %s" % old_cwd) 391 return out 392 393 def zip_spdk_sources(self, spdk_dir, dest_file): 394 self.log_print("Zipping SPDK source directory") 395 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 396 for root, directories, files in os.walk(spdk_dir, followlinks=True): 397 for file in files: 398 fh.write(os.path.relpath(os.path.join(root, file))) 399 fh.close() 400 self.log_print("Done zipping") 401 402 def read_json_stats(self, file): 403 with open(file, "r") as json_data: 404 data = json.load(json_data) 405 job_pos = 0 # job_post = 0 because using aggregated results 406 407 # Check if latency is in nano or microseconds to choose correct dict key 408 def get_lat_unit(key_prefix, dict_section): 409 # key prefix - lat, clat or slat. 410 # dict section - portion of json containing latency bucket in question 411 # Return dict key to access the bucket and unit as string 412 for k, v in dict_section.items(): 413 if k.startswith(key_prefix): 414 return k, k.split("_")[1] 415 416 def get_clat_percentiles(clat_dict_leaf): 417 if "percentile" in clat_dict_leaf: 418 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 419 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 420 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 421 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 422 423 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 424 else: 425 # Latest fio versions do not provide "percentile" results if no 426 # measurements were done, so just return zeroes 427 return [0, 0, 0, 0] 428 429 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 430 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 431 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 432 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 433 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 434 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 435 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 436 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 437 data["jobs"][job_pos]["read"][clat_key]) 438 439 if "ns" in lat_unit: 440 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 441 if "ns" in clat_unit: 442 read_p99_lat = read_p99_lat / 1000 443 read_p99_9_lat = read_p99_9_lat / 1000 444 read_p99_99_lat = read_p99_99_lat / 1000 445 read_p99_999_lat = read_p99_999_lat / 1000 446 447 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 448 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 449 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 450 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 451 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 452 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 453 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 454 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 455 data["jobs"][job_pos]["write"][clat_key]) 456 457 if "ns" in lat_unit: 458 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 459 if "ns" in clat_unit: 460 write_p99_lat = write_p99_lat / 1000 461 write_p99_9_lat = write_p99_9_lat / 1000 462 write_p99_99_lat = write_p99_99_lat / 1000 463 write_p99_999_lat = write_p99_999_lat / 1000 464 465 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 466 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 467 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 468 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 469 470 def parse_results(self, results_dir, initiator_count=None, run_num=None): 471 files = os.listdir(results_dir) 472 fio_files = filter(lambda x: ".fio" in x, files) 473 json_files = [x for x in files if ".json" in x] 474 475 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 476 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 477 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 478 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 479 480 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 481 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 482 483 header_line = ",".join(["Name", *headers]) 484 aggr_header_line = ",".join(["Name", *aggr_headers]) 485 486 # Create empty results file 487 csv_file = "nvmf_results.csv" 488 with open(os.path.join(results_dir, csv_file), "w") as fh: 489 fh.write(aggr_header_line + "\n") 490 rows = set() 491 492 for fio_config in fio_files: 493 self.log_print("Getting FIO stats for %s" % fio_config) 494 job_name, _ = os.path.splitext(fio_config) 495 496 # Look in the filename for rwmixread value. Function arguments do 497 # not have that information. 498 # TODO: Improve this function by directly using workload params instead 499 # of regexing through filenames. 500 if "read" in job_name: 501 rw_mixread = 1 502 elif "write" in job_name: 503 rw_mixread = 0 504 else: 505 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 506 507 # If "_CPU" exists in name - ignore it 508 # Initiators for the same job could have diffrent num_cores parameter 509 job_name = re.sub(r"_\d+CPU", "", job_name) 510 job_result_files = [x for x in json_files if job_name in x] 511 self.log_print("Matching result files for current fio config:") 512 for j in job_result_files: 513 self.log_print("\t %s" % j) 514 515 # There may have been more than 1 initiator used in test, need to check that 516 # Result files are created so that string after last "_" separator is server name 517 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 518 inits_avg_results = [] 519 for i in inits_names: 520 self.log_print("\tGetting stats for initiator %s" % i) 521 # There may have been more than 1 test run for this job, calculate average results for initiator 522 i_results = [x for x in job_result_files if i in x] 523 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 524 525 separate_stats = [] 526 for r in i_results: 527 stats = self.read_json_stats(os.path.join(results_dir, r)) 528 separate_stats.append(stats) 529 self.log_print(stats) 530 531 init_results = [sum(x) for x in zip(*separate_stats)] 532 init_results = [x / len(separate_stats) for x in init_results] 533 inits_avg_results.append(init_results) 534 535 self.log_print("\tAverage results for initiator %s" % i) 536 self.log_print(init_results) 537 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 538 fh.write(header_line + "\n") 539 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 540 541 # Sum results of all initiators running this FIO job. 542 # Latency results are an average of latencies from accros all initiators. 543 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 544 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 545 for key in inits_avg_results: 546 if "lat" in key: 547 inits_avg_results[key] /= len(inits_names) 548 549 # Aggregate separate read/write values into common labels 550 # Take rw_mixread into consideration for mixed read/write workloads. 551 aggregate_results = OrderedDict() 552 for h in aggr_headers: 553 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 554 if "lat" in h: 555 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 556 else: 557 _ = read_stat + write_stat 558 aggregate_results[h] = "{0:.3f}".format(_) 559 560 rows.add(",".join([job_name, *aggregate_results.values()])) 561 562 # Save results to file 563 for row in rows: 564 with open(os.path.join(results_dir, csv_file), "a") as fh: 565 fh.write(row + "\n") 566 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 567 568 def measure_sar(self, results_dir, sar_file_name): 569 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 570 time.sleep(self.sar_delay) 571 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 572 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 573 for line in out.split("\n"): 574 if "Average" in line and "CPU" in line: 575 self.log_print("Summary CPU utilization from SAR:") 576 self.log_print(line) 577 if "Average" in line and "all" in line: 578 self.log_print(line) 579 fh.write(out) 580 581 def measure_pcm_memory(self, results_dir, pcm_file_name): 582 time.sleep(self.pcm_delay) 583 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 584 pcm_memory = subprocess.Popen(cmd) 585 time.sleep(self.pcm_count) 586 pcm_memory.terminate() 587 588 def measure_pcm(self, results_dir, pcm_file_name): 589 time.sleep(self.pcm_delay) 590 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 591 subprocess.run(cmd) 592 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 593 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 594 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 595 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 596 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 597 598 def measure_pcm_power(self, results_dir, pcm_power_file_name): 599 time.sleep(self.pcm_delay) 600 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 601 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 602 fh.write(out) 603 604 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 605 self.log_print("INFO: starting network bandwidth measure") 606 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 607 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 608 609 def measure_dpdk_memory(self, results_dir): 610 self.log_print("INFO: waiting to generate DPDK memory usage") 611 time.sleep(self.dpdk_wait_time) 612 self.log_print("INFO: generating DPDK memory usage") 613 rpc.env.env_dpdk_get_mem_stats 614 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 615 616 def sys_config(self): 617 self.log_print("====Kernel release:====") 618 self.log_print(os.uname().release) 619 self.log_print("====Kernel command line:====") 620 with open('/proc/cmdline') as f: 621 cmdline = f.readlines() 622 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 623 self.log_print("====sysctl conf:====") 624 with open('/etc/sysctl.conf') as f: 625 sysctl = f.readlines() 626 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 627 self.log_print("====Cpu power info:====") 628 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 629 self.log_print("====zcopy settings:====") 630 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 631 self.log_print("====Scheduler settings:====") 632 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 633 634 635class Initiator(Server): 636 def __init__(self, name, general_config, initiator_config): 637 super(Initiator, self).__init__(name, general_config, initiator_config) 638 639 # Required fields 640 self.ip = initiator_config["ip"] 641 self.target_nic_ips = initiator_config["target_nic_ips"] 642 643 # Defaults 644 self.cpus_allowed = None 645 self.cpus_allowed_policy = "shared" 646 self.spdk_dir = "/tmp/spdk" 647 self.fio_bin = "/usr/src/fio/fio" 648 self.nvmecli_bin = "nvme" 649 self.cpu_frequency = None 650 self.subsystem_info_list = [] 651 652 if "spdk_dir" in initiator_config: 653 self.spdk_dir = initiator_config["spdk_dir"] 654 if "fio_bin" in initiator_config: 655 self.fio_bin = initiator_config["fio_bin"] 656 if "nvmecli_bin" in initiator_config: 657 self.nvmecli_bin = initiator_config["nvmecli_bin"] 658 if "cpus_allowed" in initiator_config: 659 self.cpus_allowed = initiator_config["cpus_allowed"] 660 if "cpus_allowed_policy" in initiator_config: 661 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 662 if "cpu_frequency" in initiator_config: 663 self.cpu_frequency = initiator_config["cpu_frequency"] 664 if os.getenv('SPDK_WORKSPACE'): 665 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 666 667 self.ssh_connection = paramiko.SSHClient() 668 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 669 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 670 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 671 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 672 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 673 674 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 675 self.copy_spdk("/tmp/spdk.zip") 676 self.set_local_nic_info(self.set_local_nic_info_helper()) 677 self.set_cpu_frequency() 678 self.configure_system() 679 if self.enable_adq: 680 self.configure_adq() 681 self.sys_config() 682 683 def set_local_nic_info_helper(self): 684 return json.loads(self.exec_cmd(["lshw", "-json"])) 685 686 def __del__(self): 687 self.ssh_connection.close() 688 689 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 690 if change_dir: 691 cmd = ["cd", change_dir, ";", *cmd] 692 693 # In case one of the command elements contains whitespace and is not 694 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 695 # when sending to remote system. 696 for i, c in enumerate(cmd): 697 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 698 cmd[i] = '"%s"' % c 699 cmd = " ".join(cmd) 700 701 # Redirect stderr to stdout thanks using get_pty option if needed 702 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 703 out = stdout.read().decode(encoding="utf-8") 704 705 # Check the return code 706 rc = stdout.channel.recv_exit_status() 707 if rc: 708 raise CalledProcessError(int(rc), cmd, out) 709 710 return out 711 712 def put_file(self, local, remote_dest): 713 ftp = self.ssh_connection.open_sftp() 714 ftp.put(local, remote_dest) 715 ftp.close() 716 717 def get_file(self, remote, local_dest): 718 ftp = self.ssh_connection.open_sftp() 719 ftp.get(remote, local_dest) 720 ftp.close() 721 722 def copy_spdk(self, local_spdk_zip): 723 self.log_print("Copying SPDK sources to initiator %s" % self.name) 724 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 725 self.log_print("Copied sources zip from target") 726 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 727 self.log_print("Sources unpacked") 728 729 def copy_result_files(self, dest_dir): 730 self.log_print("Copying results") 731 732 if not os.path.exists(dest_dir): 733 os.mkdir(dest_dir) 734 735 # Get list of result files from initiator and copy them back to target 736 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 737 738 for file in file_list: 739 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 740 os.path.join(dest_dir, file)) 741 self.log_print("Done copying results") 742 743 def discover_subsystems(self, address_list, subsys_no): 744 num_nvmes = range(0, subsys_no) 745 nvme_discover_output = "" 746 for ip, subsys_no in itertools.product(address_list, num_nvmes): 747 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 748 nvme_discover_cmd = ["sudo", 749 "%s" % self.nvmecli_bin, 750 "discover", "-t", "%s" % self.transport, 751 "-s", "%s" % (4420 + subsys_no), 752 "-a", "%s" % ip] 753 754 try: 755 stdout = self.exec_cmd(nvme_discover_cmd) 756 if stdout: 757 nvme_discover_output = nvme_discover_output + stdout 758 except CalledProcessError: 759 # Do nothing. In case of discovering remote subsystems of kernel target 760 # we expect "nvme discover" to fail a bunch of times because we basically 761 # scan ports. 762 pass 763 764 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 765 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 766 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 767 nvme_discover_output) # from nvme discovery output 768 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 769 subsystems = list(set(subsystems)) 770 subsystems.sort(key=lambda x: x[1]) 771 self.log_print("Found matching subsystems on target side:") 772 for s in subsystems: 773 self.log_print(s) 774 self.subsystem_info_list = subsystems 775 776 def gen_fio_filename_conf(self, *args, **kwargs): 777 # Logic implemented in SPDKInitiator and KernelInitiator classes 778 pass 779 780 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 781 fio_conf_template = """ 782[global] 783ioengine={ioengine} 784{spdk_conf} 785thread=1 786group_reporting=1 787direct=1 788percentile_list=50:90:99:99.5:99.9:99.99:99.999 789 790norandommap=1 791rw={rw} 792rwmixread={rwmixread} 793bs={block_size} 794time_based=1 795ramp_time={ramp_time} 796runtime={run_time} 797""" 798 if "spdk" in self.mode: 799 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 800 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 801 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 802 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 803 else: 804 ioengine = "libaio" 805 spdk_conf = "" 806 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 807 "|", "awk", "'{print $1}'"]) 808 subsystems = [x for x in out.split("\n") if "nvme" in x] 809 810 if self.cpus_allowed is not None: 811 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 812 cpus_num = 0 813 cpus = self.cpus_allowed.split(",") 814 for cpu in cpus: 815 if "-" in cpu: 816 a, b = cpu.split("-") 817 a = int(a) 818 b = int(b) 819 cpus_num += len(range(a, b)) 820 else: 821 cpus_num += 1 822 self.num_cores = cpus_num 823 threads = range(0, self.num_cores) 824 elif hasattr(self, 'num_cores'): 825 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 826 threads = range(0, int(self.num_cores)) 827 else: 828 self.num_cores = len(subsystems) 829 threads = range(0, len(subsystems)) 830 831 if "spdk" in self.mode: 832 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 833 else: 834 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 835 836 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 837 rw=rw, rwmixread=rwmixread, block_size=block_size, 838 ramp_time=ramp_time, run_time=run_time) 839 if num_jobs: 840 fio_config = fio_config + "numjobs=%s \n" % num_jobs 841 if self.cpus_allowed is not None: 842 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 843 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 844 fio_config = fio_config + filename_section 845 846 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 847 if hasattr(self, "num_cores"): 848 fio_config_filename += "_%sCPU" % self.num_cores 849 fio_config_filename += ".fio" 850 851 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 852 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 853 self.log_print("Created FIO Config:") 854 self.log_print(fio_config) 855 856 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 857 858 def set_cpu_frequency(self): 859 if self.cpu_frequency is not None: 860 try: 861 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 862 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 863 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 864 except Exception: 865 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 866 sys.exit() 867 else: 868 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 869 870 def run_fio(self, fio_config_file, run_num=None): 871 job_name, _ = os.path.splitext(fio_config_file) 872 self.log_print("Starting FIO run for job: %s" % job_name) 873 self.log_print("Using FIO: %s" % self.fio_bin) 874 875 if run_num: 876 for i in range(1, run_num + 1): 877 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 878 output = self.exec_cmd(["sudo", self.fio_bin, 879 fio_config_file, "--output-format=json", 880 "--output=%s" % output_filename], True) 881 self.log_print(output) 882 else: 883 output_filename = job_name + "_" + self.name + ".json" 884 output = self.exec_cmd(["sudo", self.fio_bin, 885 fio_config_file, "--output-format=json", 886 "--output" % output_filename], True) 887 self.log_print(output) 888 self.log_print("FIO run finished. Results in: %s" % output_filename) 889 890 def sys_config(self): 891 self.log_print("====Kernel release:====") 892 self.log_print(self.exec_cmd(["uname", "-r"])) 893 self.log_print("====Kernel command line:====") 894 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 895 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 896 self.log_print("====sysctl conf:====") 897 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 898 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 899 self.log_print("====Cpu power info:====") 900 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 901 902 903class KernelTarget(Target): 904 def __init__(self, name, general_config, target_config): 905 super(KernelTarget, self).__init__(name, general_config, target_config) 906 # Defaults 907 self.nvmet_bin = "nvmetcli" 908 909 if "nvmet_bin" in target_config: 910 self.nvmet_bin = target_config["nvmet_bin"] 911 912 def __del__(self): 913 nvmet_command(self.nvmet_bin, "clear") 914 915 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 916 917 nvmet_cfg = { 918 "ports": [], 919 "hosts": [], 920 "subsystems": [], 921 } 922 923 # Split disks between NIC IP's 924 disks_per_ip = int(len(nvme_list) / len(address_list)) 925 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 926 927 subsys_no = 1 928 port_no = 0 929 for ip, chunk in zip(address_list, disk_chunks): 930 for disk in chunk: 931 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 932 nvmet_cfg["subsystems"].append({ 933 "allowed_hosts": [], 934 "attr": { 935 "allow_any_host": "1", 936 "serial": "SPDK00%s" % subsys_no, 937 "version": "1.3" 938 }, 939 "namespaces": [ 940 { 941 "device": { 942 "path": disk, 943 "uuid": "%s" % uuid.uuid4() 944 }, 945 "enable": 1, 946 "nsid": subsys_no 947 } 948 ], 949 "nqn": nqn 950 }) 951 952 nvmet_cfg["ports"].append({ 953 "addr": { 954 "adrfam": "ipv4", 955 "traddr": ip, 956 "trsvcid": "%s" % (4420 + port_no), 957 "trtype": "%s" % self.transport 958 }, 959 "portid": subsys_no, 960 "referrals": [], 961 "subsystems": [nqn] 962 }) 963 subsys_no += 1 964 port_no += 1 965 self.subsystem_info_list.append([port_no, nqn, ip]) 966 967 with open("kernel.conf", "w") as fh: 968 fh.write(json.dumps(nvmet_cfg, indent=2)) 969 pass 970 971 def tgt_start(self): 972 self.log_print("Configuring kernel NVMeOF Target") 973 974 if self.null_block: 975 print("Configuring with null block device.") 976 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 977 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 978 self.subsys_no = len(null_blk_list) 979 else: 980 print("Configuring with NVMe drives.") 981 nvme_list = get_nvme_devices() 982 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 983 self.subsys_no = len(nvme_list) 984 985 nvmet_command(self.nvmet_bin, "clear") 986 nvmet_command(self.nvmet_bin, "restore kernel.conf") 987 988 if self.enable_adq: 989 self.adq_configure_tc() 990 991 self.log_print("Done configuring kernel NVMeOF Target") 992 993 994class SPDKTarget(Target): 995 def __init__(self, name, general_config, target_config): 996 super(SPDKTarget, self).__init__(name, general_config, target_config) 997 998 # Required fields 999 self.core_mask = target_config["core_mask"] 1000 self.num_cores = self.get_num_cores(self.core_mask) 1001 1002 # Defaults 1003 self.dif_insert_strip = False 1004 self.null_block_dif_type = 0 1005 self.num_shared_buffers = 4096 1006 1007 if "num_shared_buffers" in target_config: 1008 self.num_shared_buffers = target_config["num_shared_buffers"] 1009 if "null_block_dif_type" in target_config: 1010 self.null_block_dif_type = target_config["null_block_dif_type"] 1011 if "dif_insert_strip" in target_config: 1012 self.dif_insert_strip = target_config["dif_insert_strip"] 1013 1014 def get_num_cores(self, core_mask): 1015 if "0x" in core_mask: 1016 return bin(int(core_mask, 16)).count("1") 1017 else: 1018 num_cores = 0 1019 core_mask = core_mask.replace("[", "") 1020 core_mask = core_mask.replace("]", "") 1021 for i in core_mask.split(","): 1022 if "-" in i: 1023 x, y = i.split("-") 1024 num_cores += len(range(int(x), int(y))) + 1 1025 else: 1026 num_cores += 1 1027 return num_cores 1028 1029 def spdk_tgt_configure(self): 1030 self.log_print("Configuring SPDK NVMeOF target via RPC") 1031 numa_list = get_used_numa_nodes() 1032 1033 # Create RDMA transport layer 1034 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1035 num_shared_buffers=self.num_shared_buffers, 1036 dif_insert_or_strip=self.dif_insert_strip, 1037 sock_priority=self.adq_priority) 1038 self.log_print("SPDK NVMeOF transport layer:") 1039 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1040 1041 if self.null_block: 1042 self.spdk_tgt_add_nullblock(self.null_block) 1043 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1044 else: 1045 self.spdk_tgt_add_nvme_conf() 1046 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1047 1048 if self.enable_adq: 1049 self.adq_configure_tc() 1050 self.log_print("Done configuring SPDK NVMeOF Target") 1051 1052 def spdk_tgt_add_nullblock(self, null_block_count): 1053 md_size = 0 1054 block_size = 4096 1055 if self.null_block_dif_type != 0: 1056 md_size = 128 1057 1058 self.log_print("Adding null block bdevices to config via RPC") 1059 for i in range(null_block_count): 1060 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1061 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1062 dif_type=self.null_block_dif_type, md_size=md_size) 1063 self.log_print("SPDK Bdevs configuration:") 1064 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1065 1066 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1067 self.log_print("Adding NVMe bdevs to config via RPC") 1068 1069 bdfs = get_nvme_devices_bdf() 1070 bdfs = [b.replace(":", ".") for b in bdfs] 1071 1072 if req_num_disks: 1073 if req_num_disks > len(bdfs): 1074 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1075 sys.exit(1) 1076 else: 1077 bdfs = bdfs[0:req_num_disks] 1078 1079 for i, bdf in enumerate(bdfs): 1080 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1081 1082 self.log_print("SPDK Bdevs configuration:") 1083 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1084 1085 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1086 self.log_print("Adding subsystems to config") 1087 port = "4420" 1088 if not req_num_disks: 1089 req_num_disks = get_nvme_devices_count() 1090 1091 # Distribute bdevs between provided NICs 1092 num_disks = range(0, req_num_disks) 1093 if len(num_disks) == 1: 1094 disks_per_ip = 1 1095 else: 1096 disks_per_ip = int(len(num_disks) / len(ips)) 1097 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1098 1099 # Create subsystems, add bdevs to namespaces, add listeners 1100 for ip, chunk in zip(ips, disk_chunks): 1101 for c in chunk: 1102 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1103 serial = "SPDK00%s" % c 1104 bdev_name = "Nvme%sn1" % c 1105 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1106 allow_any_host=True, max_namespaces=8) 1107 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1108 1109 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 1110 trtype=self.transport, 1111 traddr=ip, 1112 trsvcid=port, 1113 adrfam="ipv4") 1114 1115 self.subsystem_info_list.append([port, nqn, ip]) 1116 self.log_print("SPDK NVMeOF subsystem configuration:") 1117 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1118 1119 def tgt_start(self): 1120 if self.null_block: 1121 self.subsys_no = 1 1122 else: 1123 self.subsys_no = get_nvme_devices_count() 1124 self.log_print("Starting SPDK NVMeOF Target process") 1125 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1126 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1127 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1128 1129 with open(self.pid, "w") as fh: 1130 fh.write(str(proc.pid)) 1131 self.nvmf_proc = proc 1132 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1133 self.log_print("Waiting for spdk to initilize...") 1134 while True: 1135 if os.path.exists("/var/tmp/spdk.sock"): 1136 break 1137 time.sleep(1) 1138 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1139 1140 if self.enable_zcopy: 1141 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1142 enable_zerocopy_send=True) 1143 self.log_print("Target socket options:") 1144 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1145 1146 if self.enable_adq: 1147 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1148 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1149 nvme_adminq_poll_period_us=100000, retry_count=4) 1150 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1151 1152 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1153 1154 rpc.framework_start_init(self.client) 1155 self.spdk_tgt_configure() 1156 1157 def __del__(self): 1158 if hasattr(self, "nvmf_proc"): 1159 try: 1160 self.nvmf_proc.terminate() 1161 self.nvmf_proc.wait() 1162 except Exception as e: 1163 self.log_print(e) 1164 self.nvmf_proc.kill() 1165 self.nvmf_proc.communicate() 1166 1167 1168class KernelInitiator(Initiator): 1169 def __init__(self, name, general_config, initiator_config): 1170 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1171 1172 # Defaults 1173 self.extra_params = "" 1174 1175 if "extra_params" in initiator_config: 1176 self.extra_params = initiator_config["extra_params"] 1177 1178 def __del__(self): 1179 self.ssh_connection.close() 1180 1181 def kernel_init_connect(self, address_list, subsys_no): 1182 self.log_print("Below connection attempts may result in error messages, this is expected!") 1183 for subsystem in self.subsystem_info_list: 1184 self.log_print("Trying to connect %s %s %s" % subsystem) 1185 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1186 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1187 time.sleep(2) 1188 1189 def kernel_init_disconnect(self, address_list, subsys_no): 1190 for subsystem in self.subsystem_info_list: 1191 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1192 time.sleep(1) 1193 1194 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1195 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 1196 "|", "awk", "'{print $1}'"]) 1197 nvme_list = [x for x in out.split("\n") if "nvme" in x] 1198 1199 filename_section = "" 1200 nvme_per_split = int(len(nvme_list) / len(threads)) 1201 remainder = len(nvme_list) % len(threads) 1202 iterator = iter(nvme_list) 1203 result = [] 1204 for i in range(len(threads)): 1205 result.append([]) 1206 for j in range(nvme_per_split): 1207 result[i].append(next(iterator)) 1208 if remainder: 1209 result[i].append(next(iterator)) 1210 remainder -= 1 1211 for i, r in enumerate(result): 1212 header = "[filename%s]" % i 1213 disks = "\n".join(["filename=%s" % x for x in r]) 1214 job_section_qd = round((io_depth * len(r)) / num_jobs) 1215 if job_section_qd == 0: 1216 job_section_qd = 1 1217 iodepth = "iodepth=%s" % job_section_qd 1218 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1219 1220 return filename_section 1221 1222 1223class SPDKInitiator(Initiator): 1224 def __init__(self, name, general_config, initiator_config): 1225 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1226 1227 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1228 self.install_spdk(self.spdk_dir) 1229 1230 # Required fields 1231 self.num_cores = initiator_config["num_cores"] 1232 1233 def install_spdk(self, local_spdk_zip): 1234 self.log_print("Using fio binary %s" % self.fio_bin) 1235 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1236 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1237 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1238 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1239 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1240 1241 self.log_print("SPDK built") 1242 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1243 1244 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1245 bdev_cfg_section = { 1246 "subsystems": [ 1247 { 1248 "subsystem": "bdev", 1249 "config": [] 1250 } 1251 ] 1252 } 1253 1254 for i, subsys in enumerate(remote_subsystem_list): 1255 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1256 nvme_ctrl = { 1257 "method": "bdev_nvme_attach_controller", 1258 "params": { 1259 "name": "Nvme{}".format(i), 1260 "trtype": self.transport, 1261 "traddr": sub_addr, 1262 "trsvcid": sub_port, 1263 "subnqn": sub_nqn, 1264 "adrfam": "IPv4" 1265 } 1266 } 1267 1268 if self.enable_adq: 1269 nvme_ctrl["params"].update({"priority": "1"}) 1270 1271 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1272 1273 return json.dumps(bdev_cfg_section, indent=2) 1274 1275 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1276 filename_section = "" 1277 if len(threads) >= len(subsystems): 1278 threads = range(0, len(subsystems)) 1279 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1280 nvme_per_split = int(len(subsystems) / len(threads)) 1281 remainder = len(subsystems) % len(threads) 1282 iterator = iter(filenames) 1283 result = [] 1284 for i in range(len(threads)): 1285 result.append([]) 1286 for j in range(nvme_per_split): 1287 result[i].append(next(iterator)) 1288 if remainder: 1289 result[i].append(next(iterator)) 1290 remainder -= 1 1291 for i, r in enumerate(result): 1292 header = "[filename%s]" % i 1293 disks = "\n".join(["filename=%s" % x for x in r]) 1294 job_section_qd = round((io_depth * len(r)) / num_jobs) 1295 if job_section_qd == 0: 1296 job_section_qd = 1 1297 iodepth = "iodepth=%s" % job_section_qd 1298 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1299 1300 return filename_section 1301 1302 1303if __name__ == "__main__": 1304 spdk_zip_path = "/tmp/spdk.zip" 1305 target_results_dir = "/tmp/results" 1306 1307 if (len(sys.argv) > 1): 1308 config_file_path = sys.argv[1] 1309 else: 1310 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1311 config_file_path = os.path.join(script_full_dir, "config.json") 1312 1313 print("Using config file: %s" % config_file_path) 1314 with open(config_file_path, "r") as config: 1315 data = json.load(config) 1316 1317 initiators = [] 1318 fio_cases = [] 1319 1320 general_config = data["general"] 1321 target_config = data["target"] 1322 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1323 1324 for k, v in data.items(): 1325 if "target" in k: 1326 if data[k]["mode"] == "spdk": 1327 target_obj = SPDKTarget(k, data["general"], v) 1328 elif data[k]["mode"] == "kernel": 1329 target_obj = KernelTarget(k, data["general"], v) 1330 pass 1331 elif "initiator" in k: 1332 if data[k]["mode"] == "spdk": 1333 init_obj = SPDKInitiator(k, data["general"], v) 1334 elif data[k]["mode"] == "kernel": 1335 init_obj = KernelInitiator(k, data["general"], v) 1336 initiators.append(init_obj) 1337 elif "fio" in k: 1338 fio_workloads = itertools.product(data[k]["bs"], 1339 data[k]["qd"], 1340 data[k]["rw"]) 1341 1342 fio_run_time = data[k]["run_time"] 1343 fio_ramp_time = data[k]["ramp_time"] 1344 fio_rw_mix_read = data[k]["rwmixread"] 1345 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1346 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1347 else: 1348 continue 1349 1350 target_obj.tgt_start() 1351 1352 try: 1353 os.mkdir(target_results_dir) 1354 except FileExistsError: 1355 pass 1356 1357 for i in initiators: 1358 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1359 if i.enable_adq: 1360 i.adq_configure_tc() 1361 1362 # Poor mans threading 1363 # Run FIO tests 1364 for block_size, io_depth, rw in fio_workloads: 1365 threads = [] 1366 configs = [] 1367 for i in initiators: 1368 if i.mode == "kernel": 1369 i.kernel_init_connect(i.target_nic_ips, target_obj.subsys_no) 1370 1371 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1372 fio_num_jobs, fio_ramp_time, fio_run_time) 1373 configs.append(cfg) 1374 1375 for i, cfg in zip(initiators, configs): 1376 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1377 threads.append(t) 1378 if target_obj.enable_sar: 1379 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1380 sar_file_name = ".".join([sar_file_name, "txt"]) 1381 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 1382 threads.append(t) 1383 1384 if target_obj.enable_pcm: 1385 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1386 1387 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_fnames[0],)) 1388 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_fnames[1],)) 1389 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_fnames[2],)) 1390 1391 threads.append(pcm_cpu_t) 1392 threads.append(pcm_mem_t) 1393 threads.append(pcm_pow_t) 1394 1395 if target_obj.enable_bandwidth: 1396 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1397 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1398 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 1399 threads.append(t) 1400 1401 if target_obj.enable_dpdk_memory: 1402 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 1403 threads.append(t) 1404 1405 for t in threads: 1406 t.start() 1407 for t in threads: 1408 t.join() 1409 1410 for i in initiators: 1411 if i.mode == "kernel": 1412 i.kernel_init_disconnect(i.target_nic_ips, target_obj.subsys_no) 1413 i.copy_result_files(target_results_dir) 1414 1415 target_obj.restore_governor() 1416 target_obj.restore_tuned() 1417 target_obj.restore_services() 1418 target_obj.restore_sysctl() 1419 for i in initiators: 1420 i.restore_governor() 1421 i.restore_tuned() 1422 i.restore_services() 1423 i.restore_sysctl() 1424 target_obj.parse_results(target_results_dir) 1425