1#!/usr/bin/env python3 2 3import os 4import re 5import sys 6import json 7import zipfile 8import threading 9import subprocess 10import itertools 11import configparser 12import time 13import uuid 14from collections import OrderedDict 15 16import paramiko 17import pandas as pd 18 19import rpc 20import rpc.client 21from common import * 22 23 24class Server: 25 def __init__(self, name, general_config, server_config): 26 self.name = name 27 self.username = general_config["username"] 28 self.password = general_config["password"] 29 self.transport = general_config["transport"].lower() 30 self.nic_ips = server_config["nic_ips"] 31 self.mode = server_config["mode"] 32 33 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 34 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 35 self.irq_scripts_dir = server_config["irq_scripts_dir"] 36 37 self.local_nic_info = [] 38 self._nics_json_obj = {} 39 self.svc_restore_dict = {} 40 self.sysctl_restore_dict = {} 41 self.tuned_restore_dict = {} 42 self.governor_restore = "" 43 self.tuned_profile = "" 44 45 self.enable_adq = False 46 self.adq_priority = None 47 if "adq_enable" in server_config and server_config["adq_enable"]: 48 self.enable_adq = server_config["adq_enable"] 49 self.adq_priority = 1 50 51 if "tuned_profile" in server_config: 52 self.tuned_profile = server_config["tuned_profile"] 53 54 if not re.match("^[A-Za-z0-9]*$", name): 55 self.log_print("Please use a name which contains only letters or numbers") 56 sys.exit(1) 57 58 def log_print(self, msg): 59 print("[%s] %s" % (self.name, msg), flush=True) 60 61 def get_uncommented_lines(self, lines): 62 return [line for line in lines if line and not line.startswith('#')] 63 64 def get_nic_name_by_ip(self, ip): 65 if not self._nics_json_obj: 66 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 67 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 68 for nic in self._nics_json_obj: 69 for addr in nic["addr_info"]: 70 if ip in addr["local"]: 71 return nic["ifname"] 72 73 def set_local_nic_info_helper(self): 74 pass 75 76 def set_local_nic_info(self, pci_info): 77 def extract_network_elements(json_obj): 78 nic_list = [] 79 if isinstance(json_obj, list): 80 for x in json_obj: 81 nic_list.extend(extract_network_elements(x)) 82 elif isinstance(json_obj, dict): 83 if "children" in json_obj: 84 nic_list.extend(extract_network_elements(json_obj["children"])) 85 if "class" in json_obj.keys() and "network" in json_obj["class"]: 86 nic_list.append(json_obj) 87 return nic_list 88 89 self.local_nic_info = extract_network_elements(pci_info) 90 91 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 92 return "" 93 94 def configure_system(self): 95 self.configure_services() 96 self.configure_sysctl() 97 self.configure_tuned() 98 self.configure_cpu_governor() 99 self.configure_irq_affinity() 100 101 def configure_adq(self): 102 self.adq_load_modules() 103 self.adq_configure_nic() 104 105 def adq_load_modules(self): 106 self.log_print("Modprobing ADQ-related Linux modules...") 107 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 108 for module in adq_module_deps: 109 try: 110 self.exec_cmd(["sudo", "modprobe", module]) 111 self.log_print("%s loaded!" % module) 112 except CalledProcessError as e: 113 self.log_print("ERROR: failed to load module %s" % module) 114 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 115 116 def adq_configure_nic(self): 117 self.log_print("Configuring NIC port settings for ADQ testing...") 118 119 # Reload the driver first, to make sure any previous settings are re-set. 120 try: 121 self.exec_cmd(["sudo", "rmmod", "ice"]) 122 self.exec_cmd(["sudo", "modprobe", "ice"]) 123 except CalledProcessError as e: 124 self.log_print("ERROR: failed to reload ice module!") 125 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 126 127 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 128 for nic in nic_names: 129 self.log_print(nic) 130 try: 131 self.exec_cmd(["sudo", "ethtool", "-K", nic, 132 "hw-tc-offload", "on"]) # Enable hardware TC offload 133 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 134 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 135 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 136 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 137 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 138 except CalledProcessError as e: 139 self.log_print("ERROR: failed to configure NIC port using ethtool!") 140 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 141 self.log_print("Please update your NIC driver and firmware versions and try again.") 142 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 143 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 144 145 def configure_services(self): 146 self.log_print("Configuring active services...") 147 svc_config = configparser.ConfigParser(strict=False) 148 149 # Below list is valid only for RHEL / Fedora systems and might not 150 # contain valid names for other distributions. 151 svc_target_state = { 152 "firewalld": "inactive", 153 "irqbalance": "inactive", 154 "lldpad.service": "inactive", 155 "lldpad.socket": "inactive" 156 } 157 158 for service in svc_target_state: 159 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 160 out = "\n".join(["[%s]" % service, out]) 161 svc_config.read_string(out) 162 163 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 164 continue 165 166 service_state = svc_config[service]["ActiveState"] 167 self.log_print("Current state of %s service is %s" % (service, service_state)) 168 self.svc_restore_dict.update({service: service_state}) 169 if service_state != "inactive": 170 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 171 self.exec_cmd(["sudo", "systemctl", "stop", service]) 172 173 def configure_sysctl(self): 174 self.log_print("Tuning sysctl settings...") 175 176 busy_read = 50000 177 if self.enable_adq and self.mode == "spdk": 178 busy_read = 1 179 180 sysctl_opts = { 181 "net.core.busy_poll": 0, 182 "net.core.busy_read": busy_read, 183 "net.core.somaxconn": 4096, 184 "net.core.netdev_max_backlog": 8192, 185 "net.ipv4.tcp_max_syn_backlog": 16384, 186 "net.core.rmem_max": 268435456, 187 "net.core.wmem_max": 268435456, 188 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 189 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 190 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 191 "net.ipv4.route.flush": 1, 192 "vm.overcommit_memory": 1, 193 } 194 195 for opt, value in sysctl_opts.items(): 196 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 197 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 198 199 def configure_tuned(self): 200 if not self.tuned_profile: 201 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 202 return 203 204 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 205 service = "tuned" 206 tuned_config = configparser.ConfigParser(strict=False) 207 208 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 209 out = "\n".join(["[%s]" % service, out]) 210 tuned_config.read_string(out) 211 tuned_state = tuned_config[service]["ActiveState"] 212 self.svc_restore_dict.update({service: tuned_state}) 213 214 if tuned_state != "inactive": 215 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 216 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 217 218 self.tuned_restore_dict = { 219 "profile": profile, 220 "mode": profile_mode 221 } 222 223 self.exec_cmd(["sudo", "systemctl", "start", service]) 224 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 225 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 226 227 def configure_cpu_governor(self): 228 self.log_print("Setting CPU governor to performance...") 229 230 # This assumes that there is the same CPU scaling governor on each CPU 231 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 232 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 233 234 def configure_irq_affinity(self): 235 self.log_print("Setting NIC irq affinity for NICs...") 236 237 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 238 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 239 for nic in nic_names: 240 irq_cmd = ["sudo", irq_script_path, nic] 241 self.log_print(irq_cmd) 242 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 243 244 def restore_services(self): 245 self.log_print("Restoring services...") 246 for service, state in self.svc_restore_dict.items(): 247 cmd = "stop" if state == "inactive" else "start" 248 self.exec_cmd(["sudo", "systemctl", cmd, service]) 249 250 def restore_sysctl(self): 251 self.log_print("Restoring sysctl settings...") 252 for opt, value in self.sysctl_restore_dict.items(): 253 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 254 255 def restore_tuned(self): 256 self.log_print("Restoring tuned-adm settings...") 257 258 if not self.tuned_restore_dict: 259 return 260 261 if self.tuned_restore_dict["mode"] == "auto": 262 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 263 self.log_print("Reverted tuned-adm to auto_profile.") 264 else: 265 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 266 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 267 268 def restore_governor(self): 269 self.log_print("Restoring CPU governor setting...") 270 if self.governor_restore: 271 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 272 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 273 274 275class Target(Server): 276 def __init__(self, name, general_config, target_config): 277 super(Target, self).__init__(name, general_config, target_config) 278 279 # Defaults 280 self.enable_sar = False 281 self.sar_delay = 0 282 self.sar_interval = 0 283 self.sar_count = 0 284 self.enable_pcm = False 285 self.pcm_dir = "" 286 self.pcm_delay = 0 287 self.pcm_interval = 0 288 self.pcm_count = 0 289 self.enable_bandwidth = 0 290 self.bandwidth_count = 0 291 self.enable_dpdk_memory = False 292 self.dpdk_wait_time = 0 293 self.enable_zcopy = False 294 self.scheduler_name = "static" 295 self.null_block = 0 296 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 297 self.subsystem_info_list = [] 298 299 if "null_block_devices" in target_config: 300 self.null_block = target_config["null_block_devices"] 301 if "sar_settings" in target_config: 302 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 303 if "pcm_settings" in target_config: 304 self.enable_pcm = True 305 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 306 if "enable_bandwidth" in target_config: 307 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 308 if "enable_dpdk_memory" in target_config: 309 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 310 if "scheduler_settings" in target_config: 311 self.scheduler_name = target_config["scheduler_settings"] 312 if "zcopy_settings" in target_config: 313 self.enable_zcopy = target_config["zcopy_settings"] 314 315 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 316 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 317 self.set_local_nic_info(self.set_local_nic_info_helper()) 318 self.configure_system() 319 if self.enable_adq: 320 self.configure_adq() 321 self.sys_config() 322 323 def set_local_nic_info_helper(self): 324 return json.loads(self.exec_cmd(["lshw", "-json"])) 325 326 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 327 stderr_opt = None 328 if stderr_redirect: 329 stderr_opt = subprocess.STDOUT 330 if change_dir: 331 old_cwd = os.getcwd() 332 os.chdir(change_dir) 333 self.log_print("Changing directory to %s" % change_dir) 334 335 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 336 337 if change_dir: 338 os.chdir(old_cwd) 339 self.log_print("Changing directory to %s" % old_cwd) 340 return out 341 342 def zip_spdk_sources(self, spdk_dir, dest_file): 343 self.log_print("Zipping SPDK source directory") 344 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 345 for root, directories, files in os.walk(spdk_dir, followlinks=True): 346 for file in files: 347 fh.write(os.path.relpath(os.path.join(root, file))) 348 fh.close() 349 self.log_print("Done zipping") 350 351 def read_json_stats(self, file): 352 with open(file, "r") as json_data: 353 data = json.load(json_data) 354 job_pos = 0 # job_post = 0 because using aggregated results 355 356 # Check if latency is in nano or microseconds to choose correct dict key 357 def get_lat_unit(key_prefix, dict_section): 358 # key prefix - lat, clat or slat. 359 # dict section - portion of json containing latency bucket in question 360 # Return dict key to access the bucket and unit as string 361 for k, v in dict_section.items(): 362 if k.startswith(key_prefix): 363 return k, k.split("_")[1] 364 365 def get_clat_percentiles(clat_dict_leaf): 366 if "percentile" in clat_dict_leaf: 367 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 368 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 369 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 370 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 371 372 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 373 else: 374 # Latest fio versions do not provide "percentile" results if no 375 # measurements were done, so just return zeroes 376 return [0, 0, 0, 0] 377 378 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 379 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 380 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 381 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 382 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 383 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 384 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 385 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 386 data["jobs"][job_pos]["read"][clat_key]) 387 388 if "ns" in lat_unit: 389 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 390 if "ns" in clat_unit: 391 read_p99_lat = read_p99_lat / 1000 392 read_p99_9_lat = read_p99_9_lat / 1000 393 read_p99_99_lat = read_p99_99_lat / 1000 394 read_p99_999_lat = read_p99_999_lat / 1000 395 396 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 397 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 398 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 399 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 400 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 401 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 402 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 403 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 404 data["jobs"][job_pos]["write"][clat_key]) 405 406 if "ns" in lat_unit: 407 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 408 if "ns" in clat_unit: 409 write_p99_lat = write_p99_lat / 1000 410 write_p99_9_lat = write_p99_9_lat / 1000 411 write_p99_99_lat = write_p99_99_lat / 1000 412 write_p99_999_lat = write_p99_999_lat / 1000 413 414 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 415 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 416 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 417 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 418 419 def parse_results(self, results_dir, initiator_count=None, run_num=None): 420 files = os.listdir(results_dir) 421 fio_files = filter(lambda x: ".fio" in x, files) 422 json_files = [x for x in files if ".json" in x] 423 424 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 425 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 426 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 427 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 428 429 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 430 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 431 432 header_line = ",".join(["Name", *headers]) 433 aggr_header_line = ",".join(["Name", *aggr_headers]) 434 435 # Create empty results file 436 csv_file = "nvmf_results.csv" 437 with open(os.path.join(results_dir, csv_file), "w") as fh: 438 fh.write(aggr_header_line + "\n") 439 rows = set() 440 441 for fio_config in fio_files: 442 self.log_print("Getting FIO stats for %s" % fio_config) 443 job_name, _ = os.path.splitext(fio_config) 444 445 # Look in the filename for rwmixread value. Function arguments do 446 # not have that information. 447 # TODO: Improve this function by directly using workload params instead 448 # of regexing through filenames. 449 if "read" in job_name: 450 rw_mixread = 1 451 elif "write" in job_name: 452 rw_mixread = 0 453 else: 454 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 455 456 # If "_CPU" exists in name - ignore it 457 # Initiators for the same job could have diffrent num_cores parameter 458 job_name = re.sub(r"_\d+CPU", "", job_name) 459 job_result_files = [x for x in json_files if job_name in x] 460 self.log_print("Matching result files for current fio config:") 461 for j in job_result_files: 462 self.log_print("\t %s" % j) 463 464 # There may have been more than 1 initiator used in test, need to check that 465 # Result files are created so that string after last "_" separator is server name 466 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 467 inits_avg_results = [] 468 for i in inits_names: 469 self.log_print("\tGetting stats for initiator %s" % i) 470 # There may have been more than 1 test run for this job, calculate average results for initiator 471 i_results = [x for x in job_result_files if i in x] 472 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 473 474 separate_stats = [] 475 for r in i_results: 476 stats = self.read_json_stats(os.path.join(results_dir, r)) 477 separate_stats.append(stats) 478 self.log_print(stats) 479 480 init_results = [sum(x) for x in zip(*separate_stats)] 481 init_results = [x / len(separate_stats) for x in init_results] 482 inits_avg_results.append(init_results) 483 484 self.log_print("\tAverage results for initiator %s" % i) 485 self.log_print(init_results) 486 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 487 fh.write(header_line + "\n") 488 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 489 490 # Sum results of all initiators running this FIO job. 491 # Latency results are an average of latencies from accros all initiators. 492 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 493 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 494 for key in inits_avg_results: 495 if "lat" in key: 496 inits_avg_results[key] /= len(inits_names) 497 498 # Aggregate separate read/write values into common labels 499 # Take rw_mixread into consideration for mixed read/write workloads. 500 aggregate_results = OrderedDict() 501 for h in aggr_headers: 502 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 503 if "lat" in h: 504 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 505 else: 506 _ = read_stat + write_stat 507 aggregate_results[h] = "{0:.3f}".format(_) 508 509 rows.add(",".join([job_name, *aggregate_results.values()])) 510 511 # Save results to file 512 for row in rows: 513 with open(os.path.join(results_dir, csv_file), "a") as fh: 514 fh.write(row + "\n") 515 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 516 517 def measure_sar(self, results_dir, sar_file_name): 518 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 519 time.sleep(self.sar_delay) 520 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 521 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 522 for line in out.split("\n"): 523 if "Average" in line and "CPU" in line: 524 self.log_print("Summary CPU utilization from SAR:") 525 self.log_print(line) 526 if "Average" in line and "all" in line: 527 self.log_print(line) 528 fh.write(out) 529 530 def measure_pcm_memory(self, results_dir, pcm_file_name): 531 time.sleep(self.pcm_delay) 532 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 533 pcm_memory = subprocess.Popen(cmd) 534 time.sleep(self.pcm_count) 535 pcm_memory.terminate() 536 537 def measure_pcm(self, results_dir, pcm_file_name): 538 time.sleep(self.pcm_delay) 539 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 540 subprocess.run(cmd) 541 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 542 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 543 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 544 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 545 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 546 547 def measure_pcm_power(self, results_dir, pcm_power_file_name): 548 time.sleep(self.pcm_delay) 549 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 550 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 551 fh.write(out) 552 553 def measure_bandwidth(self, results_dir, bandwidth_file_name): 554 self.exec_cmd(["bwm-ng", "-o csv", "-F %s/%s" % (results_dir, bandwidth_file_name), 555 "-a 1", "-t 1000", "-c %s" % self.bandwidth_count]) 556 557 def measure_dpdk_memory(self, results_dir): 558 self.log_print("INFO: waiting to generate DPDK memory usage") 559 time.sleep(self.dpdk_wait_time) 560 self.log_print("INFO: generating DPDK memory usage") 561 rpc.env.env_dpdk_get_mem_stats 562 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 563 564 def sys_config(self): 565 self.log_print("====Kernel release:====") 566 self.log_print(os.uname().release) 567 self.log_print("====Kernel command line:====") 568 with open('/proc/cmdline') as f: 569 cmdline = f.readlines() 570 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 571 self.log_print("====sysctl conf:====") 572 with open('/etc/sysctl.conf') as f: 573 sysctl = f.readlines() 574 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 575 self.log_print("====Cpu power info:====") 576 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 577 self.log_print("====zcopy settings:====") 578 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 579 self.log_print("====Scheduler settings:====") 580 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 581 582 583class Initiator(Server): 584 def __init__(self, name, general_config, initiator_config): 585 super(Initiator, self).__init__(name, general_config, initiator_config) 586 587 # Required fields 588 self.ip = initiator_config["ip"] 589 self.remote_nic_ips = initiator_config["remote_nic_ips"] 590 591 # Defaults 592 self.cpus_allowed = None 593 self.cpus_allowed_policy = "shared" 594 self.spdk_dir = "/tmp/spdk" 595 self.fio_bin = "/usr/src/fio/fio" 596 self.nvmecli_bin = "nvme" 597 self.cpu_frequency = None 598 self.subsystem_info_list = [] 599 600 if "spdk_dir" in initiator_config: 601 self.spdk_dir = initiator_config["spdk_dir"] 602 if "fio_bin" in initiator_config: 603 self.fio_bin = initiator_config["fio_bin"] 604 if "nvmecli_bin" in initiator_config: 605 self.nvmecli_bin = initiator_config["nvmecli_bin"] 606 if "cpus_allowed" in initiator_config: 607 self.cpus_allowed = initiator_config["cpus_allowed"] 608 if "cpus_allowed_policy" in initiator_config: 609 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 610 if "cpu_frequency" in initiator_config: 611 self.cpu_frequency = initiator_config["cpu_frequency"] 612 if os.getenv('SPDK_WORKSPACE'): 613 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 614 615 self.ssh_connection = paramiko.SSHClient() 616 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 617 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 618 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 619 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 620 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 621 self.set_local_nic_info(self.set_local_nic_info_helper()) 622 self.set_cpu_frequency() 623 self.configure_system() 624 if self.enable_adq: 625 self.configure_adq() 626 self.sys_config() 627 628 def set_local_nic_info_helper(self): 629 return json.loads(self.exec_cmd(["lshw", "-json"])) 630 631 def __del__(self): 632 self.ssh_connection.close() 633 634 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 635 if change_dir: 636 cmd = ["cd", change_dir, ";", *cmd] 637 638 # In case one of the command elements contains whitespace and is not 639 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 640 # when sending to remote system. 641 for i, c in enumerate(cmd): 642 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 643 cmd[i] = '"%s"' % c 644 cmd = " ".join(cmd) 645 646 # Redirect stderr to stdout thanks using get_pty option if needed 647 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 648 out = stdout.read().decode(encoding="utf-8") 649 650 # Check the return code 651 rc = stdout.channel.recv_exit_status() 652 if rc: 653 raise CalledProcessError(int(rc), cmd, out) 654 655 return out 656 657 def put_file(self, local, remote_dest): 658 ftp = self.ssh_connection.open_sftp() 659 ftp.put(local, remote_dest) 660 ftp.close() 661 662 def get_file(self, remote, local_dest): 663 ftp = self.ssh_connection.open_sftp() 664 ftp.get(remote, local_dest) 665 ftp.close() 666 667 def copy_result_files(self, dest_dir): 668 self.log_print("Copying results") 669 670 if not os.path.exists(dest_dir): 671 os.mkdir(dest_dir) 672 673 # Get list of result files from initiator and copy them back to target 674 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 675 676 for file in file_list: 677 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 678 os.path.join(dest_dir, file)) 679 self.log_print("Done copying results") 680 681 def discover_subsystems(self, address_list, subsys_no): 682 num_nvmes = range(0, subsys_no) 683 nvme_discover_output = "" 684 for ip, subsys_no in itertools.product(address_list, num_nvmes): 685 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 686 nvme_discover_cmd = ["sudo", 687 "%s" % self.nvmecli_bin, 688 "discover", "-t", "%s" % self.transport, 689 "-s", "%s" % (4420 + subsys_no), 690 "-a", "%s" % ip] 691 692 try: 693 stdout = self.exec_cmd(nvme_discover_cmd) 694 if stdout: 695 nvme_discover_output = nvme_discover_output + stdout 696 except CalledProcessError: 697 # Do nothing. In case of discovering remote subsystems of kernel target 698 # we expect "nvme discover" to fail a bunch of times because we basically 699 # scan ports. 700 pass 701 702 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 703 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 704 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 705 nvme_discover_output) # from nvme discovery output 706 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 707 subsystems = list(set(subsystems)) 708 subsystems.sort(key=lambda x: x[1]) 709 self.log_print("Found matching subsystems on target side:") 710 for s in subsystems: 711 self.log_print(s) 712 self.subsystem_info_list = subsystems 713 714 def gen_fio_filename_conf(self, *args, **kwargs): 715 # Logic implemented in SPDKInitiator and KernelInitiator classes 716 pass 717 718 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10): 719 fio_conf_template = """ 720[global] 721ioengine={ioengine} 722{spdk_conf} 723thread=1 724group_reporting=1 725direct=1 726percentile_list=50:90:99:99.5:99.9:99.99:99.999 727 728norandommap=1 729rw={rw} 730rwmixread={rwmixread} 731bs={block_size} 732time_based=1 733ramp_time={ramp_time} 734runtime={run_time} 735""" 736 if "spdk" in self.mode: 737 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 738 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 739 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 740 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 741 else: 742 ioengine = "libaio" 743 spdk_conf = "" 744 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 745 "|", "awk", "'{print $1}'"]) 746 subsystems = [x for x in out.split("\n") if "nvme" in x] 747 748 if self.cpus_allowed is not None: 749 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 750 cpus_num = 0 751 cpus = self.cpus_allowed.split(",") 752 for cpu in cpus: 753 if "-" in cpu: 754 a, b = cpu.split("-") 755 a = int(a) 756 b = int(b) 757 cpus_num += len(range(a, b)) 758 else: 759 cpus_num += 1 760 threads = range(0, cpus_num) 761 elif hasattr(self, 'num_cores'): 762 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 763 threads = range(0, int(self.num_cores)) 764 else: 765 threads = range(0, len(subsystems)) 766 767 if "spdk" in self.mode: 768 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 769 else: 770 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 771 772 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 773 rw=rw, rwmixread=rwmixread, block_size=block_size, 774 ramp_time=ramp_time, run_time=run_time) 775 if num_jobs: 776 fio_config = fio_config + "numjobs=%s \n" % num_jobs 777 if self.cpus_allowed is not None: 778 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 779 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 780 fio_config = fio_config + filename_section 781 782 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 783 if hasattr(self, "num_cores"): 784 fio_config_filename += "_%sCPU" % self.num_cores 785 fio_config_filename += ".fio" 786 787 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 788 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 789 self.log_print("Created FIO Config:") 790 self.log_print(fio_config) 791 792 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 793 794 def set_cpu_frequency(self): 795 if self.cpu_frequency is not None: 796 try: 797 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 798 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 799 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 800 except Exception: 801 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 802 sys.exit() 803 else: 804 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 805 806 def run_fio(self, fio_config_file, run_num=None): 807 job_name, _ = os.path.splitext(fio_config_file) 808 self.log_print("Starting FIO run for job: %s" % job_name) 809 self.log_print("Using FIO: %s" % self.fio_bin) 810 811 if run_num: 812 for i in range(1, run_num + 1): 813 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 814 output = self.exec_cmd(["sudo", self.fio_bin, 815 fio_config_file, "--output-format=json", 816 "--output=%s" % output_filename], True) 817 self.log_print(output) 818 else: 819 output_filename = job_name + "_" + self.name + ".json" 820 output = self.exec_cmd(["sudo", self.fio_bin, 821 fio_config_file, "--output-format=json", 822 "--output" % output_filename], True) 823 self.log_print(output) 824 self.log_print("FIO run finished. Results in: %s" % output_filename) 825 826 def sys_config(self): 827 self.log_print("====Kernel release:====") 828 self.log_print(self.exec_cmd(["uname", "-r"])) 829 self.log_print("====Kernel command line:====") 830 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 831 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 832 self.log_print("====sysctl conf:====") 833 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 834 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 835 self.log_print("====Cpu power info:====") 836 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 837 838 839class KernelTarget(Target): 840 def __init__(self, name, general_config, target_config): 841 super(KernelTarget, self).__init__(name, general_config, target_config) 842 # Defaults 843 self.nvmet_bin = "nvmetcli" 844 845 if "nvmet_bin" in target_config: 846 self.nvmet_bin = target_config["nvmet_bin"] 847 848 def __del__(self): 849 nvmet_command(self.nvmet_bin, "clear") 850 851 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 852 853 nvmet_cfg = { 854 "ports": [], 855 "hosts": [], 856 "subsystems": [], 857 } 858 859 # Split disks between NIC IP's 860 disks_per_ip = int(len(nvme_list) / len(address_list)) 861 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 862 863 subsys_no = 1 864 port_no = 0 865 for ip, chunk in zip(address_list, disk_chunks): 866 for disk in chunk: 867 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 868 nvmet_cfg["subsystems"].append({ 869 "allowed_hosts": [], 870 "attr": { 871 "allow_any_host": "1", 872 "serial": "SPDK00%s" % subsys_no, 873 "version": "1.3" 874 }, 875 "namespaces": [ 876 { 877 "device": { 878 "path": disk, 879 "uuid": "%s" % uuid.uuid4() 880 }, 881 "enable": 1, 882 "nsid": subsys_no 883 } 884 ], 885 "nqn": nqn 886 }) 887 888 nvmet_cfg["ports"].append({ 889 "addr": { 890 "adrfam": "ipv4", 891 "traddr": ip, 892 "trsvcid": "%s" % (4420 + port_no), 893 "trtype": "%s" % self.transport 894 }, 895 "portid": subsys_no, 896 "referrals": [], 897 "subsystems": [nqn] 898 }) 899 subsys_no += 1 900 port_no += 1 901 self.subsystem_info_list.append([port_no, nqn, ip]) 902 903 with open("kernel.conf", "w") as fh: 904 fh.write(json.dumps(nvmet_cfg, indent=2)) 905 pass 906 907 def tgt_start(self): 908 self.log_print("Configuring kernel NVMeOF Target") 909 910 if self.null_block: 911 print("Configuring with null block device.") 912 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 913 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 914 self.subsys_no = len(null_blk_list) 915 else: 916 print("Configuring with NVMe drives.") 917 nvme_list = get_nvme_devices() 918 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 919 self.subsys_no = len(nvme_list) 920 921 nvmet_command(self.nvmet_bin, "clear") 922 nvmet_command(self.nvmet_bin, "restore kernel.conf") 923 self.log_print("Done configuring kernel NVMeOF Target") 924 925 926class SPDKTarget(Target): 927 def __init__(self, name, general_config, target_config): 928 super(SPDKTarget, self).__init__(name, general_config, target_config) 929 930 # Required fields 931 self.core_mask = target_config["core_mask"] 932 self.num_cores = self.get_num_cores(self.core_mask) 933 934 # Defaults 935 self.dif_insert_strip = False 936 self.null_block_dif_type = 0 937 self.num_shared_buffers = 4096 938 939 if "num_shared_buffers" in target_config: 940 self.num_shared_buffers = target_config["num_shared_buffers"] 941 if "null_block_dif_type" in target_config: 942 self.null_block_dif_type = target_config["null_block_dif_type"] 943 if "dif_insert_strip" in target_config: 944 self.dif_insert_strip = target_config["dif_insert_strip"] 945 946 def get_num_cores(self, core_mask): 947 if "0x" in core_mask: 948 return bin(int(core_mask, 16)).count("1") 949 else: 950 num_cores = 0 951 core_mask = core_mask.replace("[", "") 952 core_mask = core_mask.replace("]", "") 953 for i in core_mask.split(","): 954 if "-" in i: 955 x, y = i.split("-") 956 num_cores += len(range(int(x), int(y))) + 1 957 else: 958 num_cores += 1 959 return num_cores 960 961 def spdk_tgt_configure(self): 962 self.log_print("Configuring SPDK NVMeOF target via RPC") 963 numa_list = get_used_numa_nodes() 964 965 # Create RDMA transport layer 966 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 967 num_shared_buffers=self.num_shared_buffers, 968 dif_insert_or_strip=self.dif_insert_strip, 969 sock_priority=self.adq_priority) 970 self.log_print("SPDK NVMeOF transport layer:") 971 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 972 973 if self.null_block: 974 self.spdk_tgt_add_nullblock(self.null_block) 975 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 976 else: 977 self.spdk_tgt_add_nvme_conf() 978 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 979 self.log_print("Done configuring SPDK NVMeOF Target") 980 981 def spdk_tgt_add_nullblock(self, null_block_count): 982 md_size = 0 983 block_size = 4096 984 if self.null_block_dif_type != 0: 985 md_size = 128 986 987 self.log_print("Adding null block bdevices to config via RPC") 988 for i in range(null_block_count): 989 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 990 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 991 dif_type=self.null_block_dif_type, md_size=md_size) 992 self.log_print("SPDK Bdevs configuration:") 993 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 994 995 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 996 self.log_print("Adding NVMe bdevs to config via RPC") 997 998 bdfs = get_nvme_devices_bdf() 999 bdfs = [b.replace(":", ".") for b in bdfs] 1000 1001 if req_num_disks: 1002 if req_num_disks > len(bdfs): 1003 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1004 sys.exit(1) 1005 else: 1006 bdfs = bdfs[0:req_num_disks] 1007 1008 for i, bdf in enumerate(bdfs): 1009 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1010 1011 self.log_print("SPDK Bdevs configuration:") 1012 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1013 1014 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1015 self.log_print("Adding subsystems to config") 1016 port = "4420" 1017 if not req_num_disks: 1018 req_num_disks = get_nvme_devices_count() 1019 1020 # Distribute bdevs between provided NICs 1021 num_disks = range(0, req_num_disks) 1022 if len(num_disks) == 1: 1023 disks_per_ip = 1 1024 else: 1025 disks_per_ip = int(len(num_disks) / len(ips)) 1026 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1027 1028 # Create subsystems, add bdevs to namespaces, add listeners 1029 for ip, chunk in zip(ips, disk_chunks): 1030 for c in chunk: 1031 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1032 serial = "SPDK00%s" % c 1033 bdev_name = "Nvme%sn1" % c 1034 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1035 allow_any_host=True, max_namespaces=8) 1036 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1037 1038 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn, 1039 trtype=self.transport, 1040 traddr=ip, 1041 trsvcid=port, 1042 adrfam="ipv4") 1043 1044 self.subsystem_info_list.append([port, nqn, ip]) 1045 self.log_print("SPDK NVMeOF subsystem configuration:") 1046 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1047 1048 def tgt_start(self): 1049 if self.null_block: 1050 self.subsys_no = 1 1051 else: 1052 self.subsys_no = get_nvme_devices_count() 1053 self.log_print("Starting SPDK NVMeOF Target process") 1054 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1055 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1056 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1057 1058 with open(self.pid, "w") as fh: 1059 fh.write(str(proc.pid)) 1060 self.nvmf_proc = proc 1061 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1062 self.log_print("Waiting for spdk to initilize...") 1063 while True: 1064 if os.path.exists("/var/tmp/spdk.sock"): 1065 break 1066 time.sleep(1) 1067 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1068 1069 if self.enable_zcopy: 1070 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1071 enable_zerocopy_send=True) 1072 self.log_print("Target socket options:") 1073 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1074 1075 if self.enable_adq: 1076 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1077 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1078 nvme_adminq_poll_period_us=100000, retry_count=4) 1079 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1080 1081 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1082 1083 rpc.framework_start_init(self.client) 1084 self.spdk_tgt_configure() 1085 1086 def __del__(self): 1087 if hasattr(self, "nvmf_proc"): 1088 try: 1089 self.nvmf_proc.terminate() 1090 self.nvmf_proc.wait() 1091 except Exception as e: 1092 self.log_print(e) 1093 self.nvmf_proc.kill() 1094 self.nvmf_proc.communicate() 1095 1096 1097class KernelInitiator(Initiator): 1098 def __init__(self, name, general_config, initiator_config): 1099 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1100 1101 # Defaults 1102 self.extra_params = "" 1103 1104 if "extra_params" in initiator_config: 1105 self.extra_params = initiator_config["extra_params"] 1106 1107 def __del__(self): 1108 self.ssh_connection.close() 1109 1110 def kernel_init_connect(self, address_list, subsys_no): 1111 self.log_print("Below connection attempts may result in error messages, this is expected!") 1112 for subsystem in self.subsystem_info_list: 1113 self.log_print("Trying to connect %s %s %s" % subsystem) 1114 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1115 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1116 time.sleep(2) 1117 1118 def kernel_init_disconnect(self, address_list, subsys_no): 1119 for subsystem in self.subsystem_info_list: 1120 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1121 time.sleep(1) 1122 1123 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1124 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 1125 "|", "awk", "'{print $1}'"]) 1126 nvme_list = [x for x in out.split("\n") if "nvme" in x] 1127 1128 filename_section = "" 1129 nvme_per_split = int(len(nvme_list) / len(threads)) 1130 remainder = len(nvme_list) % len(threads) 1131 iterator = iter(nvme_list) 1132 result = [] 1133 for i in range(len(threads)): 1134 result.append([]) 1135 for j in range(nvme_per_split): 1136 result[i].append(next(iterator)) 1137 if remainder: 1138 result[i].append(next(iterator)) 1139 remainder -= 1 1140 for i, r in enumerate(result): 1141 header = "[filename%s]" % i 1142 disks = "\n".join(["filename=%s" % x for x in r]) 1143 job_section_qd = round((io_depth * len(r)) / num_jobs) 1144 if job_section_qd == 0: 1145 job_section_qd = 1 1146 iodepth = "iodepth=%s" % job_section_qd 1147 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1148 1149 return filename_section 1150 1151 1152class SPDKInitiator(Initiator): 1153 def __init__(self, name, general_config, initiator_config): 1154 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1155 1156 # Required fields 1157 self.num_cores = initiator_config["num_cores"] 1158 1159 def install_spdk(self, local_spdk_zip): 1160 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 1161 self.log_print("Copied sources zip from target") 1162 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 1163 1164 self.log_print("Sources unpacked") 1165 self.log_print("Using fio binary %s" % self.fio_bin) 1166 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1167 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1168 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1169 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1170 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1171 1172 self.log_print("SPDK built") 1173 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1174 1175 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1176 bdev_cfg_section = { 1177 "subsystems": [ 1178 { 1179 "subsystem": "bdev", 1180 "config": [] 1181 } 1182 ] 1183 } 1184 1185 for i, subsys in enumerate(remote_subsystem_list): 1186 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1187 nvme_ctrl = { 1188 "method": "bdev_nvme_attach_controller", 1189 "params": { 1190 "name": "Nvme{}".format(i), 1191 "trtype": self.transport, 1192 "traddr": sub_addr, 1193 "trsvcid": sub_port, 1194 "subnqn": sub_nqn, 1195 "adrfam": "IPv4" 1196 } 1197 } 1198 1199 if self.enable_adq: 1200 nvme_ctrl["params"].update({"priority": "1"}) 1201 1202 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1203 1204 return json.dumps(bdev_cfg_section, indent=2) 1205 1206 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1207 filename_section = "" 1208 if len(threads) >= len(subsystems): 1209 threads = range(0, len(subsystems)) 1210 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1211 nvme_per_split = int(len(subsystems) / len(threads)) 1212 remainder = len(subsystems) % len(threads) 1213 iterator = iter(filenames) 1214 result = [] 1215 for i in range(len(threads)): 1216 result.append([]) 1217 for j in range(nvme_per_split): 1218 result[i].append(next(iterator)) 1219 if remainder: 1220 result[i].append(next(iterator)) 1221 remainder -= 1 1222 for i, r in enumerate(result): 1223 header = "[filename%s]" % i 1224 disks = "\n".join(["filename=%s" % x for x in r]) 1225 job_section_qd = round((io_depth * len(r)) / num_jobs) 1226 if job_section_qd == 0: 1227 job_section_qd = 1 1228 iodepth = "iodepth=%s" % job_section_qd 1229 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1230 1231 return filename_section 1232 1233 1234if __name__ == "__main__": 1235 spdk_zip_path = "/tmp/spdk.zip" 1236 target_results_dir = "/tmp/results" 1237 1238 if (len(sys.argv) > 1): 1239 config_file_path = sys.argv[1] 1240 else: 1241 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1242 config_file_path = os.path.join(script_full_dir, "config.json") 1243 1244 print("Using config file: %s" % config_file_path) 1245 with open(config_file_path, "r") as config: 1246 data = json.load(config) 1247 1248 initiators = [] 1249 fio_cases = [] 1250 1251 general_config = data["general"] 1252 target_config = data["target"] 1253 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1254 1255 for k, v in data.items(): 1256 if "target" in k: 1257 if data[k]["mode"] == "spdk": 1258 target_obj = SPDKTarget(k, data["general"], v) 1259 elif data[k]["mode"] == "kernel": 1260 target_obj = KernelTarget(k, data["general"], v) 1261 pass 1262 elif "initiator" in k: 1263 if data[k]["mode"] == "spdk": 1264 init_obj = SPDKInitiator(k, data["general"], v) 1265 elif data[k]["mode"] == "kernel": 1266 init_obj = KernelInitiator(k, data["general"], v) 1267 initiators.append(init_obj) 1268 elif "fio" in k: 1269 fio_workloads = itertools.product(data[k]["bs"], 1270 data[k]["qd"], 1271 data[k]["rw"]) 1272 1273 fio_run_time = data[k]["run_time"] 1274 fio_ramp_time = data[k]["ramp_time"] 1275 fio_rw_mix_read = data[k]["rwmixread"] 1276 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1277 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1278 else: 1279 continue 1280 1281 # Copy and install SPDK on remote initiators 1282 if "skip_spdk_install" not in data["general"]: 1283 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path) 1284 threads = [] 1285 for i in initiators: 1286 if i.mode == "spdk": 1287 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,)) 1288 threads.append(t) 1289 t.start() 1290 for t in threads: 1291 t.join() 1292 1293 target_obj.tgt_start() 1294 1295 try: 1296 os.mkdir(target_results_dir) 1297 except FileExistsError: 1298 pass 1299 1300 for i in initiators: 1301 i.discover_subsystems(i.remote_nic_ips, target_obj.subsys_no) 1302 1303 # Poor mans threading 1304 # Run FIO tests 1305 for block_size, io_depth, rw in fio_workloads: 1306 threads = [] 1307 configs = [] 1308 for i in initiators: 1309 if i.mode == "kernel": 1310 i.kernel_init_connect(i.remote_nic_ips, target_obj.subsys_no) 1311 1312 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1313 fio_num_jobs, fio_ramp_time, fio_run_time) 1314 configs.append(cfg) 1315 1316 for i, cfg in zip(initiators, configs): 1317 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1318 threads.append(t) 1319 if target_obj.enable_sar: 1320 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1321 sar_file_name = ".".join([sar_file_name, "txt"]) 1322 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name)) 1323 threads.append(t) 1324 1325 if target_obj.enable_pcm: 1326 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1327 1328 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_fnames[0],)) 1329 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_fnames[1],)) 1330 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_fnames[2],)) 1331 1332 threads.append(pcm_cpu_t) 1333 threads.append(pcm_mem_t) 1334 threads.append(pcm_pow_t) 1335 1336 if target_obj.enable_bandwidth: 1337 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1338 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1339 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,)) 1340 threads.append(t) 1341 1342 if target_obj.enable_dpdk_memory: 1343 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir)) 1344 threads.append(t) 1345 1346 for t in threads: 1347 t.start() 1348 for t in threads: 1349 t.join() 1350 1351 for i in initiators: 1352 if i.mode == "kernel": 1353 i.kernel_init_disconnect(i.remote_nic_ips, target_obj.subsys_no) 1354 i.copy_result_files(target_results_dir) 1355 1356 target_obj.restore_governor() 1357 target_obj.restore_tuned() 1358 target_obj.restore_services() 1359 target_obj.restore_sysctl() 1360 for i in initiators: 1361 i.restore_governor() 1362 i.restore_tuned() 1363 i.restore_services() 1364 i.restore_sysctl() 1365 target_obj.parse_results(target_results_dir) 1366