1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import argparse 7import json 8import zipfile 9import threading 10import subprocess 11import itertools 12import configparser 13import time 14import uuid 15from collections import OrderedDict 16 17import paramiko 18import pandas as pd 19 20import rpc 21import rpc.client 22from common import * 23 24 25class Server: 26 def __init__(self, name, general_config, server_config): 27 self.name = name 28 self.username = general_config["username"] 29 self.password = general_config["password"] 30 self.transport = general_config["transport"].lower() 31 self.nic_ips = server_config["nic_ips"] 32 self.mode = server_config["mode"] 33 34 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 35 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 36 self.irq_scripts_dir = server_config["irq_scripts_dir"] 37 38 self.local_nic_info = [] 39 self._nics_json_obj = {} 40 self.svc_restore_dict = {} 41 self.sysctl_restore_dict = {} 42 self.tuned_restore_dict = {} 43 self.governor_restore = "" 44 self.tuned_profile = "" 45 46 self.enable_adq = False 47 self.adq_priority = None 48 if "adq_enable" in server_config and server_config["adq_enable"]: 49 self.enable_adq = server_config["adq_enable"] 50 self.adq_priority = 1 51 52 if "tuned_profile" in server_config: 53 self.tuned_profile = server_config["tuned_profile"] 54 55 if not re.match("^[A-Za-z0-9]*$", name): 56 self.log_print("Please use a name which contains only letters or numbers") 57 sys.exit(1) 58 59 def log_print(self, msg): 60 print("[%s] %s" % (self.name, msg), flush=True) 61 62 def get_uncommented_lines(self, lines): 63 return [line for line in lines if line and not line.startswith('#')] 64 65 def get_nic_name_by_ip(self, ip): 66 if not self._nics_json_obj: 67 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 68 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 69 for nic in self._nics_json_obj: 70 for addr in nic["addr_info"]: 71 if ip in addr["local"]: 72 return nic["ifname"] 73 74 def set_local_nic_info_helper(self): 75 pass 76 77 def set_local_nic_info(self, pci_info): 78 def extract_network_elements(json_obj): 79 nic_list = [] 80 if isinstance(json_obj, list): 81 for x in json_obj: 82 nic_list.extend(extract_network_elements(x)) 83 elif isinstance(json_obj, dict): 84 if "children" in json_obj: 85 nic_list.extend(extract_network_elements(json_obj["children"])) 86 if "class" in json_obj.keys() and "network" in json_obj["class"]: 87 nic_list.append(json_obj) 88 return nic_list 89 90 self.local_nic_info = extract_network_elements(pci_info) 91 92 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 93 return "" 94 95 def configure_system(self): 96 self.configure_services() 97 self.configure_sysctl() 98 self.configure_tuned() 99 self.configure_cpu_governor() 100 self.configure_irq_affinity() 101 102 def configure_adq(self): 103 if self.mode == "kernel": 104 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 105 return 106 self.adq_load_modules() 107 self.adq_configure_nic() 108 109 def adq_load_modules(self): 110 self.log_print("Modprobing ADQ-related Linux modules...") 111 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 112 for module in adq_module_deps: 113 try: 114 self.exec_cmd(["sudo", "modprobe", module]) 115 self.log_print("%s loaded!" % module) 116 except CalledProcessError as e: 117 self.log_print("ERROR: failed to load module %s" % module) 118 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 119 120 def adq_configure_tc(self): 121 self.log_print("Configuring ADQ Traffic classess and filters...") 122 123 if self.mode == "kernel": 124 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 125 return 126 127 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 128 num_queues_tc1 = self.num_cores 129 port_param = "dst_port" if isinstance(self, Target) else "src_port" 130 ports = set([p[0] for p in self.subsystem_info_list]) 131 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 132 133 for nic_ip in self.nic_ips: 134 nic_name = self.get_nic_name_by_ip(nic_ip) 135 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 136 "root", "mqprio", "num_tc", "2", "map", "0", "1", 137 "queues", "%s@0" % num_queues_tc0, 138 "%s@%s" % (num_queues_tc1, num_queues_tc0), 139 "hw", "1", "mode", "channel"] 140 self.log_print(" ".join(tc_qdisc_map_cmd)) 141 self.exec_cmd(tc_qdisc_map_cmd) 142 143 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 144 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 145 self.exec_cmd(tc_qdisc_ingress_cmd) 146 147 for port in ports: 148 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 149 "protocol", "ip", "ingress", "prio", "1", "flower", 150 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 151 "skip_sw", "hw_tc", "1"] 152 self.log_print(" ".join(tc_filter_cmd)) 153 self.exec_cmd(tc_filter_cmd) 154 155 # Ethtool coalese settings must be applied after configuring traffic classes 156 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 157 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 158 159 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 160 xps_cmd = ["sudo", xps_script_path, nic_name] 161 self.log_print(xps_cmd) 162 self.exec_cmd(xps_cmd) 163 164 def adq_configure_nic(self): 165 self.log_print("Configuring NIC port settings for ADQ testing...") 166 167 # Reload the driver first, to make sure any previous settings are re-set. 168 try: 169 self.exec_cmd(["sudo", "rmmod", "ice"]) 170 self.exec_cmd(["sudo", "modprobe", "ice"]) 171 except CalledProcessError as e: 172 self.log_print("ERROR: failed to reload ice module!") 173 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 174 175 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 176 for nic in nic_names: 177 self.log_print(nic) 178 try: 179 self.exec_cmd(["sudo", "ethtool", "-K", nic, 180 "hw-tc-offload", "on"]) # Enable hardware TC offload 181 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 182 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 183 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 184 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 185 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 186 except CalledProcessError as e: 187 self.log_print("ERROR: failed to configure NIC port using ethtool!") 188 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 189 self.log_print("Please update your NIC driver and firmware versions and try again.") 190 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 191 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 192 193 def configure_services(self): 194 self.log_print("Configuring active services...") 195 svc_config = configparser.ConfigParser(strict=False) 196 197 # Below list is valid only for RHEL / Fedora systems and might not 198 # contain valid names for other distributions. 199 svc_target_state = { 200 "firewalld": "inactive", 201 "irqbalance": "inactive", 202 "lldpad.service": "inactive", 203 "lldpad.socket": "inactive" 204 } 205 206 for service in svc_target_state: 207 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 208 out = "\n".join(["[%s]" % service, out]) 209 svc_config.read_string(out) 210 211 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 212 continue 213 214 service_state = svc_config[service]["ActiveState"] 215 self.log_print("Current state of %s service is %s" % (service, service_state)) 216 self.svc_restore_dict.update({service: service_state}) 217 if service_state != "inactive": 218 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 219 self.exec_cmd(["sudo", "systemctl", "stop", service]) 220 221 def configure_sysctl(self): 222 self.log_print("Tuning sysctl settings...") 223 224 busy_read = 0 225 if self.enable_adq and self.mode == "spdk": 226 busy_read = 1 227 228 sysctl_opts = { 229 "net.core.busy_poll": 0, 230 "net.core.busy_read": busy_read, 231 "net.core.somaxconn": 4096, 232 "net.core.netdev_max_backlog": 8192, 233 "net.ipv4.tcp_max_syn_backlog": 16384, 234 "net.core.rmem_max": 268435456, 235 "net.core.wmem_max": 268435456, 236 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 237 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 238 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 239 "net.ipv4.route.flush": 1, 240 "vm.overcommit_memory": 1, 241 } 242 243 for opt, value in sysctl_opts.items(): 244 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 245 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 246 247 def configure_tuned(self): 248 if not self.tuned_profile: 249 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 250 return 251 252 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 253 service = "tuned" 254 tuned_config = configparser.ConfigParser(strict=False) 255 256 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 257 out = "\n".join(["[%s]" % service, out]) 258 tuned_config.read_string(out) 259 tuned_state = tuned_config[service]["ActiveState"] 260 self.svc_restore_dict.update({service: tuned_state}) 261 262 if tuned_state != "inactive": 263 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 264 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 265 266 self.tuned_restore_dict = { 267 "profile": profile, 268 "mode": profile_mode 269 } 270 271 self.exec_cmd(["sudo", "systemctl", "start", service]) 272 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 273 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 274 275 def configure_cpu_governor(self): 276 self.log_print("Setting CPU governor to performance...") 277 278 # This assumes that there is the same CPU scaling governor on each CPU 279 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 280 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 281 282 def configure_irq_affinity(self): 283 self.log_print("Setting NIC irq affinity for NICs...") 284 285 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 286 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 287 for nic in nic_names: 288 irq_cmd = ["sudo", irq_script_path, nic] 289 self.log_print(irq_cmd) 290 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 291 292 def restore_services(self): 293 self.log_print("Restoring services...") 294 for service, state in self.svc_restore_dict.items(): 295 cmd = "stop" if state == "inactive" else "start" 296 self.exec_cmd(["sudo", "systemctl", cmd, service]) 297 298 def restore_sysctl(self): 299 self.log_print("Restoring sysctl settings...") 300 for opt, value in self.sysctl_restore_dict.items(): 301 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 302 303 def restore_tuned(self): 304 self.log_print("Restoring tuned-adm settings...") 305 306 if not self.tuned_restore_dict: 307 return 308 309 if self.tuned_restore_dict["mode"] == "auto": 310 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 311 self.log_print("Reverted tuned-adm to auto_profile.") 312 else: 313 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 314 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 315 316 def restore_governor(self): 317 self.log_print("Restoring CPU governor setting...") 318 if self.governor_restore: 319 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 320 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 321 322 323class Target(Server): 324 def __init__(self, name, general_config, target_config): 325 super(Target, self).__init__(name, general_config, target_config) 326 327 # Defaults 328 self.enable_sar = False 329 self.sar_delay = 0 330 self.sar_interval = 0 331 self.sar_count = 0 332 self.enable_pcm = False 333 self.pcm_dir = "" 334 self.pcm_delay = 0 335 self.pcm_interval = 0 336 self.pcm_count = 0 337 self.enable_bandwidth = 0 338 self.bandwidth_count = 0 339 self.enable_dpdk_memory = False 340 self.dpdk_wait_time = 0 341 self.enable_zcopy = False 342 self.scheduler_name = "static" 343 self.null_block = 0 344 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 345 self.subsystem_info_list = [] 346 347 if "null_block_devices" in target_config: 348 self.null_block = target_config["null_block_devices"] 349 if "sar_settings" in target_config: 350 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 351 if "pcm_settings" in target_config: 352 self.enable_pcm = True 353 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 354 if "enable_bandwidth" in target_config: 355 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 356 if "enable_dpdk_memory" in target_config: 357 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 358 if "scheduler_settings" in target_config: 359 self.scheduler_name = target_config["scheduler_settings"] 360 if "zcopy_settings" in target_config: 361 self.enable_zcopy = target_config["zcopy_settings"] 362 363 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 364 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 365 self.set_local_nic_info(self.set_local_nic_info_helper()) 366 367 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 368 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 369 370 self.configure_system() 371 if self.enable_adq: 372 self.configure_adq() 373 self.sys_config() 374 375 def set_local_nic_info_helper(self): 376 return json.loads(self.exec_cmd(["lshw", "-json"])) 377 378 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 379 stderr_opt = None 380 if stderr_redirect: 381 stderr_opt = subprocess.STDOUT 382 if change_dir: 383 old_cwd = os.getcwd() 384 os.chdir(change_dir) 385 self.log_print("Changing directory to %s" % change_dir) 386 387 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 388 389 if change_dir: 390 os.chdir(old_cwd) 391 self.log_print("Changing directory to %s" % old_cwd) 392 return out 393 394 def zip_spdk_sources(self, spdk_dir, dest_file): 395 self.log_print("Zipping SPDK source directory") 396 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 397 for root, directories, files in os.walk(spdk_dir, followlinks=True): 398 for file in files: 399 fh.write(os.path.relpath(os.path.join(root, file))) 400 fh.close() 401 self.log_print("Done zipping") 402 403 def read_json_stats(self, file): 404 with open(file, "r") as json_data: 405 data = json.load(json_data) 406 job_pos = 0 # job_post = 0 because using aggregated results 407 408 # Check if latency is in nano or microseconds to choose correct dict key 409 def get_lat_unit(key_prefix, dict_section): 410 # key prefix - lat, clat or slat. 411 # dict section - portion of json containing latency bucket in question 412 # Return dict key to access the bucket and unit as string 413 for k, _ in dict_section.items(): 414 if k.startswith(key_prefix): 415 return k, k.split("_")[1] 416 417 def get_clat_percentiles(clat_dict_leaf): 418 if "percentile" in clat_dict_leaf: 419 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 420 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 421 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 422 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 423 424 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 425 else: 426 # Latest fio versions do not provide "percentile" results if no 427 # measurements were done, so just return zeroes 428 return [0, 0, 0, 0] 429 430 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 431 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 432 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 433 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 434 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 435 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 436 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 437 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 438 data["jobs"][job_pos]["read"][clat_key]) 439 440 if "ns" in lat_unit: 441 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 442 if "ns" in clat_unit: 443 read_p99_lat = read_p99_lat / 1000 444 read_p99_9_lat = read_p99_9_lat / 1000 445 read_p99_99_lat = read_p99_99_lat / 1000 446 read_p99_999_lat = read_p99_999_lat / 1000 447 448 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 449 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 450 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 451 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 452 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 453 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 454 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 455 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 456 data["jobs"][job_pos]["write"][clat_key]) 457 458 if "ns" in lat_unit: 459 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 460 if "ns" in clat_unit: 461 write_p99_lat = write_p99_lat / 1000 462 write_p99_9_lat = write_p99_9_lat / 1000 463 write_p99_99_lat = write_p99_99_lat / 1000 464 write_p99_999_lat = write_p99_999_lat / 1000 465 466 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 467 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 468 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 469 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 470 471 def parse_results(self, results_dir, csv_file): 472 files = os.listdir(results_dir) 473 fio_files = filter(lambda x: ".fio" in x, files) 474 json_files = [x for x in files if ".json" in x] 475 476 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 477 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 478 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 479 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 480 481 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 482 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 483 484 header_line = ",".join(["Name", *headers]) 485 aggr_header_line = ",".join(["Name", *aggr_headers]) 486 487 # Create empty results file 488 with open(os.path.join(results_dir, csv_file), "w") as fh: 489 fh.write(aggr_header_line + "\n") 490 rows = set() 491 492 for fio_config in fio_files: 493 self.log_print("Getting FIO stats for %s" % fio_config) 494 job_name, _ = os.path.splitext(fio_config) 495 496 # Look in the filename for rwmixread value. Function arguments do 497 # not have that information. 498 # TODO: Improve this function by directly using workload params instead 499 # of regexing through filenames. 500 if "read" in job_name: 501 rw_mixread = 1 502 elif "write" in job_name: 503 rw_mixread = 0 504 else: 505 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 506 507 # If "_CPU" exists in name - ignore it 508 # Initiators for the same job could have diffrent num_cores parameter 509 job_name = re.sub(r"_\d+CPU", "", job_name) 510 job_result_files = [x for x in json_files if job_name in x] 511 self.log_print("Matching result files for current fio config:") 512 for j in job_result_files: 513 self.log_print("\t %s" % j) 514 515 # There may have been more than 1 initiator used in test, need to check that 516 # Result files are created so that string after last "_" separator is server name 517 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 518 inits_avg_results = [] 519 for i in inits_names: 520 self.log_print("\tGetting stats for initiator %s" % i) 521 # There may have been more than 1 test run for this job, calculate average results for initiator 522 i_results = [x for x in job_result_files if i in x] 523 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 524 525 separate_stats = [] 526 for r in i_results: 527 stats = self.read_json_stats(os.path.join(results_dir, r)) 528 separate_stats.append(stats) 529 self.log_print(stats) 530 531 init_results = [sum(x) for x in zip(*separate_stats)] 532 init_results = [x / len(separate_stats) for x in init_results] 533 inits_avg_results.append(init_results) 534 535 self.log_print("\tAverage results for initiator %s" % i) 536 self.log_print(init_results) 537 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 538 fh.write(header_line + "\n") 539 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 540 541 # Sum results of all initiators running this FIO job. 542 # Latency results are an average of latencies from accros all initiators. 543 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 544 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 545 for key in inits_avg_results: 546 if "lat" in key: 547 inits_avg_results[key] /= len(inits_names) 548 549 # Aggregate separate read/write values into common labels 550 # Take rw_mixread into consideration for mixed read/write workloads. 551 aggregate_results = OrderedDict() 552 for h in aggr_headers: 553 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 554 if "lat" in h: 555 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 556 else: 557 _ = read_stat + write_stat 558 aggregate_results[h] = "{0:.3f}".format(_) 559 560 rows.add(",".join([job_name, *aggregate_results.values()])) 561 562 # Save results to file 563 for row in rows: 564 with open(os.path.join(results_dir, csv_file), "a") as fh: 565 fh.write(row + "\n") 566 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 567 568 def measure_sar(self, results_dir, sar_file_name): 569 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 570 time.sleep(self.sar_delay) 571 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 572 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 573 for line in out.split("\n"): 574 if "Average" in line and "CPU" in line: 575 self.log_print("Summary CPU utilization from SAR:") 576 self.log_print(line) 577 if "Average" in line and "all" in line: 578 self.log_print(line) 579 fh.write(out) 580 581 def measure_pcm_memory(self, results_dir, pcm_file_name): 582 time.sleep(self.pcm_delay) 583 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 584 pcm_memory = subprocess.Popen(cmd) 585 time.sleep(self.pcm_count) 586 pcm_memory.terminate() 587 588 def measure_pcm(self, results_dir, pcm_file_name): 589 time.sleep(self.pcm_delay) 590 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 591 subprocess.run(cmd) 592 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 593 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 594 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 595 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 596 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 597 598 def measure_pcm_power(self, results_dir, pcm_power_file_name): 599 time.sleep(self.pcm_delay) 600 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 601 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 602 fh.write(out) 603 604 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 605 self.log_print("INFO: starting network bandwidth measure") 606 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 607 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 608 609 def measure_dpdk_memory(self, results_dir): 610 self.log_print("INFO: waiting to generate DPDK memory usage") 611 time.sleep(self.dpdk_wait_time) 612 self.log_print("INFO: generating DPDK memory usage") 613 rpc.env.env_dpdk_get_mem_stats 614 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 615 616 def sys_config(self): 617 self.log_print("====Kernel release:====") 618 self.log_print(os.uname().release) 619 self.log_print("====Kernel command line:====") 620 with open('/proc/cmdline') as f: 621 cmdline = f.readlines() 622 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 623 self.log_print("====sysctl conf:====") 624 with open('/etc/sysctl.conf') as f: 625 sysctl = f.readlines() 626 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 627 self.log_print("====Cpu power info:====") 628 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 629 self.log_print("====zcopy settings:====") 630 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 631 self.log_print("====Scheduler settings:====") 632 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 633 634 635class Initiator(Server): 636 def __init__(self, name, general_config, initiator_config): 637 super(Initiator, self).__init__(name, general_config, initiator_config) 638 639 # Required fields 640 self.ip = initiator_config["ip"] 641 self.target_nic_ips = initiator_config["target_nic_ips"] 642 643 # Defaults 644 self.cpus_allowed = None 645 self.cpus_allowed_policy = "shared" 646 self.spdk_dir = "/tmp/spdk" 647 self.fio_bin = "/usr/src/fio/fio" 648 self.nvmecli_bin = "nvme" 649 self.cpu_frequency = None 650 self.subsystem_info_list = [] 651 652 if "spdk_dir" in initiator_config: 653 self.spdk_dir = initiator_config["spdk_dir"] 654 if "fio_bin" in initiator_config: 655 self.fio_bin = initiator_config["fio_bin"] 656 if "nvmecli_bin" in initiator_config: 657 self.nvmecli_bin = initiator_config["nvmecli_bin"] 658 if "cpus_allowed" in initiator_config: 659 self.cpus_allowed = initiator_config["cpus_allowed"] 660 if "cpus_allowed_policy" in initiator_config: 661 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 662 if "cpu_frequency" in initiator_config: 663 self.cpu_frequency = initiator_config["cpu_frequency"] 664 if os.getenv('SPDK_WORKSPACE'): 665 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 666 667 self.ssh_connection = paramiko.SSHClient() 668 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 669 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 670 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 671 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 672 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 673 674 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 675 self.copy_spdk("/tmp/spdk.zip") 676 self.set_local_nic_info(self.set_local_nic_info_helper()) 677 self.set_cpu_frequency() 678 self.configure_system() 679 if self.enable_adq: 680 self.configure_adq() 681 self.sys_config() 682 683 def set_local_nic_info_helper(self): 684 return json.loads(self.exec_cmd(["lshw", "-json"])) 685 686 def __del__(self): 687 self.ssh_connection.close() 688 689 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 690 if change_dir: 691 cmd = ["cd", change_dir, ";", *cmd] 692 693 # In case one of the command elements contains whitespace and is not 694 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 695 # when sending to remote system. 696 for i, c in enumerate(cmd): 697 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 698 cmd[i] = '"%s"' % c 699 cmd = " ".join(cmd) 700 701 # Redirect stderr to stdout thanks using get_pty option if needed 702 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 703 out = stdout.read().decode(encoding="utf-8") 704 705 # Check the return code 706 rc = stdout.channel.recv_exit_status() 707 if rc: 708 raise CalledProcessError(int(rc), cmd, out) 709 710 return out 711 712 def put_file(self, local, remote_dest): 713 ftp = self.ssh_connection.open_sftp() 714 ftp.put(local, remote_dest) 715 ftp.close() 716 717 def get_file(self, remote, local_dest): 718 ftp = self.ssh_connection.open_sftp() 719 ftp.get(remote, local_dest) 720 ftp.close() 721 722 def copy_spdk(self, local_spdk_zip): 723 self.log_print("Copying SPDK sources to initiator %s" % self.name) 724 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 725 self.log_print("Copied sources zip from target") 726 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 727 self.log_print("Sources unpacked") 728 729 def copy_result_files(self, dest_dir): 730 self.log_print("Copying results") 731 732 if not os.path.exists(dest_dir): 733 os.mkdir(dest_dir) 734 735 # Get list of result files from initiator and copy them back to target 736 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 737 738 for file in file_list: 739 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 740 os.path.join(dest_dir, file)) 741 self.log_print("Done copying results") 742 743 def discover_subsystems(self, address_list, subsys_no): 744 num_nvmes = range(0, subsys_no) 745 nvme_discover_output = "" 746 for ip, subsys_no in itertools.product(address_list, num_nvmes): 747 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 748 nvme_discover_cmd = ["sudo", 749 "%s" % self.nvmecli_bin, 750 "discover", "-t", "%s" % self.transport, 751 "-s", "%s" % (4420 + subsys_no), 752 "-a", "%s" % ip] 753 754 try: 755 stdout = self.exec_cmd(nvme_discover_cmd) 756 if stdout: 757 nvme_discover_output = nvme_discover_output + stdout 758 except CalledProcessError: 759 # Do nothing. In case of discovering remote subsystems of kernel target 760 # we expect "nvme discover" to fail a bunch of times because we basically 761 # scan ports. 762 pass 763 764 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 765 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 766 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 767 nvme_discover_output) # from nvme discovery output 768 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 769 subsystems = list(set(subsystems)) 770 subsystems.sort(key=lambda x: x[1]) 771 self.log_print("Found matching subsystems on target side:") 772 for s in subsystems: 773 self.log_print(s) 774 self.subsystem_info_list = subsystems 775 776 def gen_fio_filename_conf(self, *args, **kwargs): 777 # Logic implemented in SPDKInitiator and KernelInitiator classes 778 pass 779 780 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 781 fio_conf_template = """ 782[global] 783ioengine={ioengine} 784{spdk_conf} 785thread=1 786group_reporting=1 787direct=1 788percentile_list=50:90:99:99.5:99.9:99.99:99.999 789 790norandommap=1 791rw={rw} 792rwmixread={rwmixread} 793bs={block_size} 794time_based=1 795ramp_time={ramp_time} 796runtime={run_time} 797rate_iops={rate_iops} 798""" 799 if "spdk" in self.mode: 800 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 801 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 802 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 803 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 804 else: 805 ioengine = "libaio" 806 spdk_conf = "" 807 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 808 "|", "awk", "'{print $1}'"]) 809 subsystems = [x for x in out.split("\n") if "nvme" in x] 810 811 if self.cpus_allowed is not None: 812 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 813 cpus_num = 0 814 cpus = self.cpus_allowed.split(",") 815 for cpu in cpus: 816 if "-" in cpu: 817 a, b = cpu.split("-") 818 a = int(a) 819 b = int(b) 820 cpus_num += len(range(a, b)) 821 else: 822 cpus_num += 1 823 self.num_cores = cpus_num 824 threads = range(0, self.num_cores) 825 elif hasattr(self, 'num_cores'): 826 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 827 threads = range(0, int(self.num_cores)) 828 else: 829 self.num_cores = len(subsystems) 830 threads = range(0, len(subsystems)) 831 832 if "spdk" in self.mode: 833 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 834 else: 835 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 836 837 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 838 rw=rw, rwmixread=rwmixread, block_size=block_size, 839 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 840 if num_jobs: 841 fio_config = fio_config + "numjobs=%s \n" % num_jobs 842 if self.cpus_allowed is not None: 843 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 844 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 845 fio_config = fio_config + filename_section 846 847 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 848 if hasattr(self, "num_cores"): 849 fio_config_filename += "_%sCPU" % self.num_cores 850 fio_config_filename += ".fio" 851 852 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 853 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 854 self.log_print("Created FIO Config:") 855 self.log_print(fio_config) 856 857 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 858 859 def set_cpu_frequency(self): 860 if self.cpu_frequency is not None: 861 try: 862 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 863 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 864 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 865 except Exception: 866 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 867 sys.exit() 868 else: 869 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 870 871 def run_fio(self, fio_config_file, run_num=None): 872 job_name, _ = os.path.splitext(fio_config_file) 873 self.log_print("Starting FIO run for job: %s" % job_name) 874 self.log_print("Using FIO: %s" % self.fio_bin) 875 876 if run_num: 877 for i in range(1, run_num + 1): 878 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 879 output = self.exec_cmd(["sudo", self.fio_bin, 880 fio_config_file, "--output-format=json", 881 "--output=%s" % output_filename], True) 882 self.log_print(output) 883 else: 884 output_filename = job_name + "_" + self.name + ".json" 885 output = self.exec_cmd(["sudo", self.fio_bin, 886 fio_config_file, "--output-format=json", 887 "--output" % output_filename], True) 888 self.log_print(output) 889 self.log_print("FIO run finished. Results in: %s" % output_filename) 890 891 def sys_config(self): 892 self.log_print("====Kernel release:====") 893 self.log_print(self.exec_cmd(["uname", "-r"])) 894 self.log_print("====Kernel command line:====") 895 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 896 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 897 self.log_print("====sysctl conf:====") 898 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 899 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 900 self.log_print("====Cpu power info:====") 901 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 902 903 904class KernelTarget(Target): 905 def __init__(self, name, general_config, target_config): 906 super(KernelTarget, self).__init__(name, general_config, target_config) 907 # Defaults 908 self.nvmet_bin = "nvmetcli" 909 910 if "nvmet_bin" in target_config: 911 self.nvmet_bin = target_config["nvmet_bin"] 912 913 def __del__(self): 914 nvmet_command(self.nvmet_bin, "clear") 915 916 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 917 918 nvmet_cfg = { 919 "ports": [], 920 "hosts": [], 921 "subsystems": [], 922 } 923 924 # Split disks between NIC IP's 925 disks_per_ip = int(len(nvme_list) / len(address_list)) 926 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 927 928 subsys_no = 1 929 port_no = 0 930 for ip, chunk in zip(address_list, disk_chunks): 931 for disk in chunk: 932 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 933 nvmet_cfg["subsystems"].append({ 934 "allowed_hosts": [], 935 "attr": { 936 "allow_any_host": "1", 937 "serial": "SPDK00%s" % subsys_no, 938 "version": "1.3" 939 }, 940 "namespaces": [ 941 { 942 "device": { 943 "path": disk, 944 "uuid": "%s" % uuid.uuid4() 945 }, 946 "enable": 1, 947 "nsid": subsys_no 948 } 949 ], 950 "nqn": nqn 951 }) 952 953 nvmet_cfg["ports"].append({ 954 "addr": { 955 "adrfam": "ipv4", 956 "traddr": ip, 957 "trsvcid": "%s" % (4420 + port_no), 958 "trtype": "%s" % self.transport 959 }, 960 "portid": subsys_no, 961 "referrals": [], 962 "subsystems": [nqn] 963 }) 964 subsys_no += 1 965 port_no += 1 966 self.subsystem_info_list.append([port_no, nqn, ip]) 967 968 with open("kernel.conf", "w") as fh: 969 fh.write(json.dumps(nvmet_cfg, indent=2)) 970 pass 971 972 def tgt_start(self): 973 self.log_print("Configuring kernel NVMeOF Target") 974 975 if self.null_block: 976 print("Configuring with null block device.") 977 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 978 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 979 self.subsys_no = len(null_blk_list) 980 else: 981 print("Configuring with NVMe drives.") 982 nvme_list = get_nvme_devices() 983 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 984 self.subsys_no = len(nvme_list) 985 986 nvmet_command(self.nvmet_bin, "clear") 987 nvmet_command(self.nvmet_bin, "restore kernel.conf") 988 989 if self.enable_adq: 990 self.adq_configure_tc() 991 992 self.log_print("Done configuring kernel NVMeOF Target") 993 994 995class SPDKTarget(Target): 996 def __init__(self, name, general_config, target_config): 997 super(SPDKTarget, self).__init__(name, general_config, target_config) 998 999 # Required fields 1000 self.core_mask = target_config["core_mask"] 1001 self.num_cores = self.get_num_cores(self.core_mask) 1002 1003 # Defaults 1004 self.dif_insert_strip = False 1005 self.null_block_dif_type = 0 1006 self.num_shared_buffers = 4096 1007 1008 if "num_shared_buffers" in target_config: 1009 self.num_shared_buffers = target_config["num_shared_buffers"] 1010 if "null_block_dif_type" in target_config: 1011 self.null_block_dif_type = target_config["null_block_dif_type"] 1012 if "dif_insert_strip" in target_config: 1013 self.dif_insert_strip = target_config["dif_insert_strip"] 1014 1015 def get_num_cores(self, core_mask): 1016 if "0x" in core_mask: 1017 return bin(int(core_mask, 16)).count("1") 1018 else: 1019 num_cores = 0 1020 core_mask = core_mask.replace("[", "") 1021 core_mask = core_mask.replace("]", "") 1022 for i in core_mask.split(","): 1023 if "-" in i: 1024 x, y = i.split("-") 1025 num_cores += len(range(int(x), int(y))) + 1 1026 else: 1027 num_cores += 1 1028 return num_cores 1029 1030 def spdk_tgt_configure(self): 1031 self.log_print("Configuring SPDK NVMeOF target via RPC") 1032 1033 # Create RDMA transport layer 1034 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1035 num_shared_buffers=self.num_shared_buffers, 1036 dif_insert_or_strip=self.dif_insert_strip, 1037 sock_priority=self.adq_priority) 1038 self.log_print("SPDK NVMeOF transport layer:") 1039 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1040 1041 if self.null_block: 1042 self.spdk_tgt_add_nullblock(self.null_block) 1043 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1044 else: 1045 self.spdk_tgt_add_nvme_conf() 1046 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1047 1048 if self.enable_adq: 1049 self.adq_configure_tc() 1050 self.log_print("Done configuring SPDK NVMeOF Target") 1051 1052 def spdk_tgt_add_nullblock(self, null_block_count): 1053 md_size = 0 1054 block_size = 4096 1055 if self.null_block_dif_type != 0: 1056 md_size = 128 1057 1058 self.log_print("Adding null block bdevices to config via RPC") 1059 for i in range(null_block_count): 1060 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1061 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1062 dif_type=self.null_block_dif_type, md_size=md_size) 1063 self.log_print("SPDK Bdevs configuration:") 1064 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1065 1066 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1067 self.log_print("Adding NVMe bdevs to config via RPC") 1068 1069 bdfs = get_nvme_devices_bdf() 1070 bdfs = [b.replace(":", ".") for b in bdfs] 1071 1072 if req_num_disks: 1073 if req_num_disks > len(bdfs): 1074 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1075 sys.exit(1) 1076 else: 1077 bdfs = bdfs[0:req_num_disks] 1078 1079 for i, bdf in enumerate(bdfs): 1080 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1081 1082 self.log_print("SPDK Bdevs configuration:") 1083 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1084 1085 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1086 self.log_print("Adding subsystems to config") 1087 port = "4420" 1088 if not req_num_disks: 1089 req_num_disks = get_nvme_devices_count() 1090 1091 # Distribute bdevs between provided NICs 1092 num_disks = range(0, req_num_disks) 1093 if len(num_disks) == 1: 1094 disks_per_ip = 1 1095 else: 1096 disks_per_ip = int(len(num_disks) / len(ips)) 1097 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1098 1099 # Create subsystems, add bdevs to namespaces, add listeners 1100 for ip, chunk in zip(ips, disk_chunks): 1101 for c in chunk: 1102 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1103 serial = "SPDK00%s" % c 1104 bdev_name = "Nvme%sn1" % c 1105 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1106 allow_any_host=True, max_namespaces=8) 1107 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1108 1109 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1110 nqn=nqn, 1111 trtype=self.transport, 1112 traddr=ip, 1113 trsvcid=port, 1114 adrfam="ipv4") 1115 1116 self.subsystem_info_list.append([port, nqn, ip]) 1117 self.log_print("SPDK NVMeOF subsystem configuration:") 1118 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1119 1120 def tgt_start(self): 1121 if self.null_block: 1122 self.subsys_no = 1 1123 else: 1124 self.subsys_no = get_nvme_devices_count() 1125 self.log_print("Starting SPDK NVMeOF Target process") 1126 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1127 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1128 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1129 1130 with open(self.pid, "w") as fh: 1131 fh.write(str(proc.pid)) 1132 self.nvmf_proc = proc 1133 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1134 self.log_print("Waiting for spdk to initilize...") 1135 while True: 1136 if os.path.exists("/var/tmp/spdk.sock"): 1137 break 1138 time.sleep(1) 1139 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1140 1141 if self.enable_zcopy: 1142 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1143 enable_zerocopy_send=True) 1144 self.log_print("Target socket options:") 1145 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1146 1147 if self.enable_adq: 1148 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1149 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1150 nvme_adminq_poll_period_us=100000, retry_count=4) 1151 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1152 1153 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1154 1155 rpc.framework_start_init(self.client) 1156 self.spdk_tgt_configure() 1157 1158 def __del__(self): 1159 if hasattr(self, "nvmf_proc"): 1160 try: 1161 self.nvmf_proc.terminate() 1162 self.nvmf_proc.wait() 1163 except Exception as e: 1164 self.log_print(e) 1165 self.nvmf_proc.kill() 1166 self.nvmf_proc.communicate() 1167 1168 1169class KernelInitiator(Initiator): 1170 def __init__(self, name, general_config, initiator_config): 1171 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1172 1173 # Defaults 1174 self.extra_params = "" 1175 1176 if "extra_params" in initiator_config: 1177 self.extra_params = initiator_config["extra_params"] 1178 1179 def __del__(self): 1180 self.ssh_connection.close() 1181 1182 def kernel_init_connect(self): 1183 self.log_print("Below connection attempts may result in error messages, this is expected!") 1184 for subsystem in self.subsystem_info_list: 1185 self.log_print("Trying to connect %s %s %s" % subsystem) 1186 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1187 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1188 time.sleep(2) 1189 1190 def kernel_init_disconnect(self): 1191 for subsystem in self.subsystem_info_list: 1192 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1193 time.sleep(1) 1194 1195 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1196 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 1197 "|", "awk", "'{print $1}'"]) 1198 nvme_list = [x for x in out.split("\n") if "nvme" in x] 1199 1200 filename_section = "" 1201 nvme_per_split = int(len(nvme_list) / len(threads)) 1202 remainder = len(nvme_list) % len(threads) 1203 iterator = iter(nvme_list) 1204 result = [] 1205 for i in range(len(threads)): 1206 result.append([]) 1207 for _ in range(nvme_per_split): 1208 result[i].append(next(iterator)) 1209 if remainder: 1210 result[i].append(next(iterator)) 1211 remainder -= 1 1212 for i, r in enumerate(result): 1213 header = "[filename%s]" % i 1214 disks = "\n".join(["filename=%s" % x for x in r]) 1215 job_section_qd = round((io_depth * len(r)) / num_jobs) 1216 if job_section_qd == 0: 1217 job_section_qd = 1 1218 iodepth = "iodepth=%s" % job_section_qd 1219 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1220 1221 return filename_section 1222 1223 1224class SPDKInitiator(Initiator): 1225 def __init__(self, name, general_config, initiator_config): 1226 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1227 1228 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1229 self.install_spdk() 1230 1231 # Required fields 1232 self.num_cores = initiator_config["num_cores"] 1233 1234 def install_spdk(self): 1235 self.log_print("Using fio binary %s" % self.fio_bin) 1236 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1237 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1238 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1239 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1240 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1241 1242 self.log_print("SPDK built") 1243 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1244 1245 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1246 bdev_cfg_section = { 1247 "subsystems": [ 1248 { 1249 "subsystem": "bdev", 1250 "config": [] 1251 } 1252 ] 1253 } 1254 1255 for i, subsys in enumerate(remote_subsystem_list): 1256 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1257 nvme_ctrl = { 1258 "method": "bdev_nvme_attach_controller", 1259 "params": { 1260 "name": "Nvme{}".format(i), 1261 "trtype": self.transport, 1262 "traddr": sub_addr, 1263 "trsvcid": sub_port, 1264 "subnqn": sub_nqn, 1265 "adrfam": "IPv4" 1266 } 1267 } 1268 1269 if self.enable_adq: 1270 nvme_ctrl["params"].update({"priority": "1"}) 1271 1272 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1273 1274 return json.dumps(bdev_cfg_section, indent=2) 1275 1276 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1277 filename_section = "" 1278 if len(threads) >= len(subsystems): 1279 threads = range(0, len(subsystems)) 1280 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1281 nvme_per_split = int(len(subsystems) / len(threads)) 1282 remainder = len(subsystems) % len(threads) 1283 iterator = iter(filenames) 1284 result = [] 1285 for i in range(len(threads)): 1286 result.append([]) 1287 for _ in range(nvme_per_split): 1288 result[i].append(next(iterator)) 1289 if remainder: 1290 result[i].append(next(iterator)) 1291 remainder -= 1 1292 for i, r in enumerate(result): 1293 header = "[filename%s]" % i 1294 disks = "\n".join(["filename=%s" % x for x in r]) 1295 job_section_qd = round((io_depth * len(r)) / num_jobs) 1296 if job_section_qd == 0: 1297 job_section_qd = 1 1298 iodepth = "iodepth=%s" % job_section_qd 1299 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1300 1301 return filename_section 1302 1303 1304if __name__ == "__main__": 1305 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1306 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1307 1308 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1309 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1310 help='Configuration file.') 1311 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1312 help='Results directory.') 1313 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1314 help='CSV results filename.') 1315 1316 args = parser.parse_args() 1317 1318 print("Using config file: %s" % args.config) 1319 with open(args.config, "r") as config: 1320 data = json.load(config) 1321 1322 initiators = [] 1323 fio_cases = [] 1324 1325 general_config = data["general"] 1326 target_config = data["target"] 1327 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1328 1329 for k, v in data.items(): 1330 if "target" in k: 1331 if data[k]["mode"] == "spdk": 1332 target_obj = SPDKTarget(k, data["general"], v) 1333 elif data[k]["mode"] == "kernel": 1334 target_obj = KernelTarget(k, data["general"], v) 1335 pass 1336 elif "initiator" in k: 1337 if data[k]["mode"] == "spdk": 1338 init_obj = SPDKInitiator(k, data["general"], v) 1339 elif data[k]["mode"] == "kernel": 1340 init_obj = KernelInitiator(k, data["general"], v) 1341 initiators.append(init_obj) 1342 elif "fio" in k: 1343 fio_workloads = itertools.product(data[k]["bs"], 1344 data[k]["qd"], 1345 data[k]["rw"]) 1346 1347 fio_run_time = data[k]["run_time"] 1348 fio_ramp_time = data[k]["ramp_time"] 1349 fio_rw_mix_read = data[k]["rwmixread"] 1350 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1351 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1352 1353 fio_rate_iops = 0 1354 if "rate_iops" in data[k]: 1355 fio_rate_iops = data[k]["rate_iops"] 1356 else: 1357 continue 1358 1359 target_obj.tgt_start() 1360 1361 try: 1362 os.mkdir(args.results) 1363 except FileExistsError: 1364 pass 1365 1366 for i in initiators: 1367 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1368 if i.enable_adq: 1369 i.adq_configure_tc() 1370 1371 # Poor mans threading 1372 # Run FIO tests 1373 for block_size, io_depth, rw in fio_workloads: 1374 threads = [] 1375 configs = [] 1376 for i in initiators: 1377 if i.mode == "kernel": 1378 i.kernel_init_connect() 1379 1380 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1381 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1382 configs.append(cfg) 1383 1384 for i, cfg in zip(initiators, configs): 1385 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1386 threads.append(t) 1387 if target_obj.enable_sar: 1388 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1389 sar_file_name = ".".join([sar_file_name, "txt"]) 1390 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1391 threads.append(t) 1392 1393 if target_obj.enable_pcm: 1394 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1395 1396 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1397 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1398 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1399 1400 threads.append(pcm_cpu_t) 1401 threads.append(pcm_mem_t) 1402 threads.append(pcm_pow_t) 1403 1404 if target_obj.enable_bandwidth: 1405 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1406 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1407 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1408 threads.append(t) 1409 1410 if target_obj.enable_dpdk_memory: 1411 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1412 threads.append(t) 1413 1414 for t in threads: 1415 t.start() 1416 for t in threads: 1417 t.join() 1418 1419 for i in initiators: 1420 if i.mode == "kernel": 1421 i.kernel_init_disconnect() 1422 i.copy_result_files(args.results) 1423 1424 target_obj.restore_governor() 1425 target_obj.restore_tuned() 1426 target_obj.restore_services() 1427 target_obj.restore_sysctl() 1428 for i in initiators: 1429 i.restore_governor() 1430 i.restore_tuned() 1431 i.restore_services() 1432 i.restore_sysctl() 1433 target_obj.parse_results(args.results, args.csv_filename) 1434