1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20 21import rpc 22import rpc.client 23from common import * 24 25 26class Server: 27 def __init__(self, name, general_config, server_config): 28 self.name = name 29 self.username = general_config["username"] 30 self.password = general_config["password"] 31 self.transport = general_config["transport"].lower() 32 self.nic_ips = server_config["nic_ips"] 33 self.mode = server_config["mode"] 34 35 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 36 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 37 self.irq_scripts_dir = server_config["irq_scripts_dir"] 38 39 self.local_nic_info = [] 40 self._nics_json_obj = {} 41 self.svc_restore_dict = {} 42 self.sysctl_restore_dict = {} 43 self.tuned_restore_dict = {} 44 self.governor_restore = "" 45 self.tuned_profile = "" 46 47 self.enable_adq = False 48 self.adq_priority = None 49 if "adq_enable" in server_config and server_config["adq_enable"]: 50 self.enable_adq = server_config["adq_enable"] 51 self.adq_priority = 1 52 53 if "tuned_profile" in server_config: 54 self.tuned_profile = server_config["tuned_profile"] 55 56 if not re.match("^[A-Za-z0-9]*$", name): 57 self.log_print("Please use a name which contains only letters or numbers") 58 sys.exit(1) 59 60 def log_print(self, msg): 61 print("[%s] %s" % (self.name, msg), flush=True) 62 63 def get_uncommented_lines(self, lines): 64 return [line for line in lines if line and not line.startswith('#')] 65 66 def get_nic_name_by_ip(self, ip): 67 if not self._nics_json_obj: 68 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 69 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 70 for nic in self._nics_json_obj: 71 for addr in nic["addr_info"]: 72 if ip in addr["local"]: 73 return nic["ifname"] 74 75 def set_local_nic_info_helper(self): 76 pass 77 78 def set_local_nic_info(self, pci_info): 79 def extract_network_elements(json_obj): 80 nic_list = [] 81 if isinstance(json_obj, list): 82 for x in json_obj: 83 nic_list.extend(extract_network_elements(x)) 84 elif isinstance(json_obj, dict): 85 if "children" in json_obj: 86 nic_list.extend(extract_network_elements(json_obj["children"])) 87 if "class" in json_obj.keys() and "network" in json_obj["class"]: 88 nic_list.append(json_obj) 89 return nic_list 90 91 self.local_nic_info = extract_network_elements(pci_info) 92 93 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 94 return "" 95 96 def configure_system(self): 97 self.configure_services() 98 self.configure_sysctl() 99 self.configure_tuned() 100 self.configure_cpu_governor() 101 self.configure_irq_affinity() 102 103 def configure_adq(self): 104 if self.mode == "kernel": 105 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 106 return 107 self.adq_load_modules() 108 self.adq_configure_nic() 109 110 def adq_load_modules(self): 111 self.log_print("Modprobing ADQ-related Linux modules...") 112 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 113 for module in adq_module_deps: 114 try: 115 self.exec_cmd(["sudo", "modprobe", module]) 116 self.log_print("%s loaded!" % module) 117 except CalledProcessError as e: 118 self.log_print("ERROR: failed to load module %s" % module) 119 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 120 121 def adq_configure_tc(self): 122 self.log_print("Configuring ADQ Traffic classess and filters...") 123 124 if self.mode == "kernel": 125 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 126 return 127 128 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 129 num_queues_tc1 = self.num_cores 130 port_param = "dst_port" if isinstance(self, Target) else "src_port" 131 port = "4420" 132 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 133 134 for nic_ip in self.nic_ips: 135 nic_name = self.get_nic_name_by_ip(nic_ip) 136 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 137 "root", "mqprio", "num_tc", "2", "map", "0", "1", 138 "queues", "%s@0" % num_queues_tc0, 139 "%s@%s" % (num_queues_tc1, num_queues_tc0), 140 "hw", "1", "mode", "channel"] 141 self.log_print(" ".join(tc_qdisc_map_cmd)) 142 self.exec_cmd(tc_qdisc_map_cmd) 143 144 time.sleep(5) 145 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 146 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 147 self.exec_cmd(tc_qdisc_ingress_cmd) 148 149 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 150 "protocol", "ip", "ingress", "prio", "1", "flower", 151 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 152 "skip_sw", "hw_tc", "1"] 153 self.log_print(" ".join(tc_filter_cmd)) 154 self.exec_cmd(tc_filter_cmd) 155 156 # show tc configuration 157 self.log_print("Show tc configuration for %s NIC..." % nic_name) 158 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 159 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 160 self.log_print("%s" % tc_disk_out) 161 self.log_print("%s" % tc_filter_out) 162 163 # Ethtool coalese settings must be applied after configuring traffic classes 164 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 165 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 166 167 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 168 xps_cmd = ["sudo", xps_script_path, nic_name] 169 self.log_print(xps_cmd) 170 self.exec_cmd(xps_cmd) 171 172 def adq_configure_nic(self): 173 self.log_print("Configuring NIC port settings for ADQ testing...") 174 175 # Reload the driver first, to make sure any previous settings are re-set. 176 try: 177 self.exec_cmd(["sudo", "rmmod", "ice"]) 178 self.exec_cmd(["sudo", "modprobe", "ice"]) 179 except CalledProcessError as e: 180 self.log_print("ERROR: failed to reload ice module!") 181 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 182 183 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 184 for nic in nic_names: 185 self.log_print(nic) 186 try: 187 self.exec_cmd(["sudo", "ethtool", "-K", nic, 188 "hw-tc-offload", "on"]) # Enable hardware TC offload 189 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 190 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 191 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 192 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 193 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 194 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 195 "channel-pkt-inspect-optimize", "on"]) 196 except CalledProcessError as e: 197 self.log_print("ERROR: failed to configure NIC port using ethtool!") 198 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 199 self.log_print("Please update your NIC driver and firmware versions and try again.") 200 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 201 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 202 203 def configure_services(self): 204 self.log_print("Configuring active services...") 205 svc_config = configparser.ConfigParser(strict=False) 206 207 # Below list is valid only for RHEL / Fedora systems and might not 208 # contain valid names for other distributions. 209 svc_target_state = { 210 "firewalld": "inactive", 211 "irqbalance": "inactive", 212 "lldpad.service": "inactive", 213 "lldpad.socket": "inactive" 214 } 215 216 for service in svc_target_state: 217 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 218 out = "\n".join(["[%s]" % service, out]) 219 svc_config.read_string(out) 220 221 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 222 continue 223 224 service_state = svc_config[service]["ActiveState"] 225 self.log_print("Current state of %s service is %s" % (service, service_state)) 226 self.svc_restore_dict.update({service: service_state}) 227 if service_state != "inactive": 228 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 229 self.exec_cmd(["sudo", "systemctl", "stop", service]) 230 231 def configure_sysctl(self): 232 self.log_print("Tuning sysctl settings...") 233 234 busy_read = 0 235 if self.enable_adq and self.mode == "spdk": 236 busy_read = 1 237 238 sysctl_opts = { 239 "net.core.busy_poll": 0, 240 "net.core.busy_read": busy_read, 241 "net.core.somaxconn": 4096, 242 "net.core.netdev_max_backlog": 8192, 243 "net.ipv4.tcp_max_syn_backlog": 16384, 244 "net.core.rmem_max": 268435456, 245 "net.core.wmem_max": 268435456, 246 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 247 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 248 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 249 "net.ipv4.route.flush": 1, 250 "vm.overcommit_memory": 1, 251 } 252 253 for opt, value in sysctl_opts.items(): 254 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 255 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 256 257 def configure_tuned(self): 258 if not self.tuned_profile: 259 self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.") 260 return 261 262 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 263 service = "tuned" 264 tuned_config = configparser.ConfigParser(strict=False) 265 266 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 267 out = "\n".join(["[%s]" % service, out]) 268 tuned_config.read_string(out) 269 tuned_state = tuned_config[service]["ActiveState"] 270 self.svc_restore_dict.update({service: tuned_state}) 271 272 if tuned_state != "inactive": 273 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 274 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 275 276 self.tuned_restore_dict = { 277 "profile": profile, 278 "mode": profile_mode 279 } 280 281 self.exec_cmd(["sudo", "systemctl", "start", service]) 282 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 283 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 284 285 def configure_cpu_governor(self): 286 self.log_print("Setting CPU governor to performance...") 287 288 # This assumes that there is the same CPU scaling governor on each CPU 289 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 290 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 291 292 def configure_irq_affinity(self): 293 self.log_print("Setting NIC irq affinity for NICs...") 294 295 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 296 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 297 for nic in nic_names: 298 irq_cmd = ["sudo", irq_script_path, nic] 299 self.log_print(irq_cmd) 300 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 301 302 def restore_services(self): 303 self.log_print("Restoring services...") 304 for service, state in self.svc_restore_dict.items(): 305 cmd = "stop" if state == "inactive" else "start" 306 self.exec_cmd(["sudo", "systemctl", cmd, service]) 307 308 def restore_sysctl(self): 309 self.log_print("Restoring sysctl settings...") 310 for opt, value in self.sysctl_restore_dict.items(): 311 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 312 313 def restore_tuned(self): 314 self.log_print("Restoring tuned-adm settings...") 315 316 if not self.tuned_restore_dict: 317 return 318 319 if self.tuned_restore_dict["mode"] == "auto": 320 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 321 self.log_print("Reverted tuned-adm to auto_profile.") 322 else: 323 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 324 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 325 326 def restore_governor(self): 327 self.log_print("Restoring CPU governor setting...") 328 if self.governor_restore: 329 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 330 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 331 332 333class Target(Server): 334 def __init__(self, name, general_config, target_config): 335 super(Target, self).__init__(name, general_config, target_config) 336 337 # Defaults 338 self.enable_sar = False 339 self.sar_delay = 0 340 self.sar_interval = 0 341 self.sar_count = 0 342 self.enable_pcm = False 343 self.pcm_dir = "" 344 self.pcm_delay = 0 345 self.pcm_interval = 0 346 self.pcm_count = 0 347 self.enable_bandwidth = 0 348 self.bandwidth_count = 0 349 self.enable_dpdk_memory = False 350 self.dpdk_wait_time = 0 351 self.enable_zcopy = False 352 self.scheduler_name = "static" 353 self.null_block = 0 354 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 355 self.subsystem_info_list = [] 356 357 if "null_block_devices" in target_config: 358 self.null_block = target_config["null_block_devices"] 359 if "sar_settings" in target_config: 360 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 361 if "pcm_settings" in target_config: 362 self.enable_pcm = True 363 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 364 if "enable_bandwidth" in target_config: 365 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 366 if "enable_dpdk_memory" in target_config: 367 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 368 if "scheduler_settings" in target_config: 369 self.scheduler_name = target_config["scheduler_settings"] 370 if "zcopy_settings" in target_config: 371 self.enable_zcopy = target_config["zcopy_settings"] 372 if "results_dir" in target_config: 373 self.results_dir = target_config["results_dir"] 374 375 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 376 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 377 self.set_local_nic_info(self.set_local_nic_info_helper()) 378 379 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 380 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 381 382 self.configure_system() 383 if self.enable_adq: 384 self.configure_adq() 385 self.sys_config() 386 387 def set_local_nic_info_helper(self): 388 return json.loads(self.exec_cmd(["lshw", "-json"])) 389 390 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 391 stderr_opt = None 392 if stderr_redirect: 393 stderr_opt = subprocess.STDOUT 394 if change_dir: 395 old_cwd = os.getcwd() 396 os.chdir(change_dir) 397 self.log_print("Changing directory to %s" % change_dir) 398 399 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 400 401 if change_dir: 402 os.chdir(old_cwd) 403 self.log_print("Changing directory to %s" % old_cwd) 404 return out 405 406 def zip_spdk_sources(self, spdk_dir, dest_file): 407 self.log_print("Zipping SPDK source directory") 408 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 409 for root, directories, files in os.walk(spdk_dir, followlinks=True): 410 for file in files: 411 fh.write(os.path.relpath(os.path.join(root, file))) 412 fh.close() 413 self.log_print("Done zipping") 414 415 def read_json_stats(self, file): 416 with open(file, "r") as json_data: 417 data = json.load(json_data) 418 job_pos = 0 # job_post = 0 because using aggregated results 419 420 # Check if latency is in nano or microseconds to choose correct dict key 421 def get_lat_unit(key_prefix, dict_section): 422 # key prefix - lat, clat or slat. 423 # dict section - portion of json containing latency bucket in question 424 # Return dict key to access the bucket and unit as string 425 for k, _ in dict_section.items(): 426 if k.startswith(key_prefix): 427 return k, k.split("_")[1] 428 429 def get_clat_percentiles(clat_dict_leaf): 430 if "percentile" in clat_dict_leaf: 431 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 432 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 433 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 434 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 435 436 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 437 else: 438 # Latest fio versions do not provide "percentile" results if no 439 # measurements were done, so just return zeroes 440 return [0, 0, 0, 0] 441 442 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 443 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 444 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 445 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 446 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 447 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 448 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 449 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 450 data["jobs"][job_pos]["read"][clat_key]) 451 452 if "ns" in lat_unit: 453 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 454 if "ns" in clat_unit: 455 read_p99_lat = read_p99_lat / 1000 456 read_p99_9_lat = read_p99_9_lat / 1000 457 read_p99_99_lat = read_p99_99_lat / 1000 458 read_p99_999_lat = read_p99_999_lat / 1000 459 460 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 461 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 462 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 463 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 464 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 465 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 466 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 467 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 468 data["jobs"][job_pos]["write"][clat_key]) 469 470 if "ns" in lat_unit: 471 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 472 if "ns" in clat_unit: 473 write_p99_lat = write_p99_lat / 1000 474 write_p99_9_lat = write_p99_9_lat / 1000 475 write_p99_99_lat = write_p99_99_lat / 1000 476 write_p99_999_lat = write_p99_999_lat / 1000 477 478 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 479 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 480 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 481 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 482 483 def parse_results(self, results_dir, csv_file): 484 files = os.listdir(results_dir) 485 fio_files = filter(lambda x: ".fio" in x, files) 486 json_files = [x for x in files if ".json" in x] 487 488 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 489 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 490 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 491 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 492 493 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 494 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 495 496 header_line = ",".join(["Name", *headers]) 497 aggr_header_line = ",".join(["Name", *aggr_headers]) 498 499 # Create empty results file 500 with open(os.path.join(results_dir, csv_file), "w") as fh: 501 fh.write(aggr_header_line + "\n") 502 rows = set() 503 504 for fio_config in fio_files: 505 self.log_print("Getting FIO stats for %s" % fio_config) 506 job_name, _ = os.path.splitext(fio_config) 507 508 # Look in the filename for rwmixread value. Function arguments do 509 # not have that information. 510 # TODO: Improve this function by directly using workload params instead 511 # of regexing through filenames. 512 if "read" in job_name: 513 rw_mixread = 1 514 elif "write" in job_name: 515 rw_mixread = 0 516 else: 517 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 518 519 # If "_CPU" exists in name - ignore it 520 # Initiators for the same job could have diffrent num_cores parameter 521 job_name = re.sub(r"_\d+CPU", "", job_name) 522 job_result_files = [x for x in json_files if job_name in x] 523 self.log_print("Matching result files for current fio config:") 524 for j in job_result_files: 525 self.log_print("\t %s" % j) 526 527 # There may have been more than 1 initiator used in test, need to check that 528 # Result files are created so that string after last "_" separator is server name 529 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 530 inits_avg_results = [] 531 for i in inits_names: 532 self.log_print("\tGetting stats for initiator %s" % i) 533 # There may have been more than 1 test run for this job, calculate average results for initiator 534 i_results = [x for x in job_result_files if i in x] 535 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 536 537 separate_stats = [] 538 for r in i_results: 539 try: 540 stats = self.read_json_stats(os.path.join(results_dir, r)) 541 separate_stats.append(stats) 542 self.log_print(stats) 543 except JSONDecodeError as e: 544 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!") 545 546 init_results = [sum(x) for x in zip(*separate_stats)] 547 init_results = [x / len(separate_stats) for x in init_results] 548 inits_avg_results.append(init_results) 549 550 self.log_print("\tAverage results for initiator %s" % i) 551 self.log_print(init_results) 552 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 553 fh.write(header_line + "\n") 554 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 555 556 # Sum results of all initiators running this FIO job. 557 # Latency results are an average of latencies from accros all initiators. 558 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 559 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 560 for key in inits_avg_results: 561 if "lat" in key: 562 inits_avg_results[key] /= len(inits_names) 563 564 # Aggregate separate read/write values into common labels 565 # Take rw_mixread into consideration for mixed read/write workloads. 566 aggregate_results = OrderedDict() 567 for h in aggr_headers: 568 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 569 if "lat" in h: 570 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 571 else: 572 _ = read_stat + write_stat 573 aggregate_results[h] = "{0:.3f}".format(_) 574 575 rows.add(",".join([job_name, *aggregate_results.values()])) 576 577 # Save results to file 578 for row in rows: 579 with open(os.path.join(results_dir, csv_file), "a") as fh: 580 fh.write(row + "\n") 581 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 582 583 def measure_sar(self, results_dir, sar_file_name): 584 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 585 cpu_number = os.cpu_count() 586 sar_idle_sum = 0 587 time.sleep(self.sar_delay) 588 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 589 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 590 for line in out.split("\n"): 591 if "Average" in line: 592 if "CPU" in line: 593 self.log_print("Summary CPU utilization from SAR:") 594 self.log_print(line) 595 elif "all" in line: 596 self.log_print(line) 597 else: 598 sar_idle_sum += float(line.split()[7]) 599 fh.write(out) 600 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 601 with open(os.path.join(results_dir, sar_file_name), "a") as f: 602 f.write("Total CPU used: " + str(sar_cpu_usage)) 603 604 def ethtool_after_fio_ramp(self, fio_ramp_time): 605 time.sleep(fio_ramp_time//2) 606 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 607 for nic in nic_names: 608 self.log_print(nic) 609 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 610 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 611 612 def measure_pcm_memory(self, results_dir, pcm_file_name): 613 time.sleep(self.pcm_delay) 614 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 615 pcm_memory = subprocess.Popen(cmd) 616 time.sleep(self.pcm_count) 617 pcm_memory.terminate() 618 619 def measure_pcm(self, results_dir, pcm_file_name): 620 time.sleep(self.pcm_delay) 621 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 622 subprocess.run(cmd) 623 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 624 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 625 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 626 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 627 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 628 629 def measure_pcm_power(self, results_dir, pcm_power_file_name): 630 time.sleep(self.pcm_delay) 631 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 632 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 633 fh.write(out) 634 635 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 636 self.log_print("INFO: starting network bandwidth measure") 637 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 638 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 639 640 def measure_dpdk_memory(self, results_dir): 641 self.log_print("INFO: waiting to generate DPDK memory usage") 642 time.sleep(self.dpdk_wait_time) 643 self.log_print("INFO: generating DPDK memory usage") 644 rpc.env.env_dpdk_get_mem_stats 645 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 646 647 def sys_config(self): 648 self.log_print("====Kernel release:====") 649 self.log_print(os.uname().release) 650 self.log_print("====Kernel command line:====") 651 with open('/proc/cmdline') as f: 652 cmdline = f.readlines() 653 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 654 self.log_print("====sysctl conf:====") 655 with open('/etc/sysctl.conf') as f: 656 sysctl = f.readlines() 657 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 658 self.log_print("====Cpu power info:====") 659 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 660 self.log_print("====zcopy settings:====") 661 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 662 self.log_print("====Scheduler settings:====") 663 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 664 665 666class Initiator(Server): 667 def __init__(self, name, general_config, initiator_config): 668 super(Initiator, self).__init__(name, general_config, initiator_config) 669 670 # Required fields 671 self.ip = initiator_config["ip"] 672 self.target_nic_ips = initiator_config["target_nic_ips"] 673 674 # Defaults 675 self.cpus_allowed = None 676 self.cpus_allowed_policy = "shared" 677 self.spdk_dir = "/tmp/spdk" 678 self.fio_bin = "/usr/src/fio/fio" 679 self.nvmecli_bin = "nvme" 680 self.cpu_frequency = None 681 self.subsystem_info_list = [] 682 683 if "spdk_dir" in initiator_config: 684 self.spdk_dir = initiator_config["spdk_dir"] 685 if "fio_bin" in initiator_config: 686 self.fio_bin = initiator_config["fio_bin"] 687 if "nvmecli_bin" in initiator_config: 688 self.nvmecli_bin = initiator_config["nvmecli_bin"] 689 if "cpus_allowed" in initiator_config: 690 self.cpus_allowed = initiator_config["cpus_allowed"] 691 if "cpus_allowed_policy" in initiator_config: 692 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 693 if "cpu_frequency" in initiator_config: 694 self.cpu_frequency = initiator_config["cpu_frequency"] 695 if os.getenv('SPDK_WORKSPACE'): 696 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 697 698 self.ssh_connection = paramiko.SSHClient() 699 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 700 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 701 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 702 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 703 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 704 705 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 706 self.copy_spdk("/tmp/spdk.zip") 707 self.set_local_nic_info(self.set_local_nic_info_helper()) 708 self.set_cpu_frequency() 709 self.configure_system() 710 if self.enable_adq: 711 self.configure_adq() 712 self.sys_config() 713 714 def set_local_nic_info_helper(self): 715 return json.loads(self.exec_cmd(["lshw", "-json"])) 716 717 def __del__(self): 718 self.ssh_connection.close() 719 720 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 721 if change_dir: 722 cmd = ["cd", change_dir, ";", *cmd] 723 724 # In case one of the command elements contains whitespace and is not 725 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 726 # when sending to remote system. 727 for i, c in enumerate(cmd): 728 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 729 cmd[i] = '"%s"' % c 730 cmd = " ".join(cmd) 731 732 # Redirect stderr to stdout thanks using get_pty option if needed 733 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 734 out = stdout.read().decode(encoding="utf-8") 735 736 # Check the return code 737 rc = stdout.channel.recv_exit_status() 738 if rc: 739 raise CalledProcessError(int(rc), cmd, out) 740 741 return out 742 743 def put_file(self, local, remote_dest): 744 ftp = self.ssh_connection.open_sftp() 745 ftp.put(local, remote_dest) 746 ftp.close() 747 748 def get_file(self, remote, local_dest): 749 ftp = self.ssh_connection.open_sftp() 750 ftp.get(remote, local_dest) 751 ftp.close() 752 753 def copy_spdk(self, local_spdk_zip): 754 self.log_print("Copying SPDK sources to initiator %s" % self.name) 755 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 756 self.log_print("Copied sources zip from target") 757 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 758 self.log_print("Sources unpacked") 759 760 def copy_result_files(self, dest_dir): 761 self.log_print("Copying results") 762 763 if not os.path.exists(dest_dir): 764 os.mkdir(dest_dir) 765 766 # Get list of result files from initiator and copy them back to target 767 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 768 769 for file in file_list: 770 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 771 os.path.join(dest_dir, file)) 772 self.log_print("Done copying results") 773 774 def discover_subsystems(self, address_list, subsys_no): 775 num_nvmes = range(0, subsys_no) 776 nvme_discover_output = "" 777 for ip, subsys_no in itertools.product(address_list, num_nvmes): 778 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 779 nvme_discover_cmd = ["sudo", 780 "%s" % self.nvmecli_bin, 781 "discover", "-t", "%s" % self.transport, 782 "-s", "%s" % (4420 + subsys_no), 783 "-a", "%s" % ip] 784 785 try: 786 stdout = self.exec_cmd(nvme_discover_cmd) 787 if stdout: 788 nvme_discover_output = nvme_discover_output + stdout 789 except CalledProcessError: 790 # Do nothing. In case of discovering remote subsystems of kernel target 791 # we expect "nvme discover" to fail a bunch of times because we basically 792 # scan ports. 793 pass 794 795 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 796 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 797 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 798 nvme_discover_output) # from nvme discovery output 799 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 800 subsystems = list(set(subsystems)) 801 subsystems.sort(key=lambda x: x[1]) 802 self.log_print("Found matching subsystems on target side:") 803 for s in subsystems: 804 self.log_print(s) 805 self.subsystem_info_list = subsystems 806 807 def gen_fio_filename_conf(self, *args, **kwargs): 808 # Logic implemented in SPDKInitiator and KernelInitiator classes 809 pass 810 811 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 812 fio_conf_template = """ 813[global] 814ioengine={ioengine} 815{spdk_conf} 816thread=1 817group_reporting=1 818direct=1 819percentile_list=50:90:99:99.5:99.9:99.99:99.999 820 821norandommap=1 822rw={rw} 823rwmixread={rwmixread} 824bs={block_size} 825time_based=1 826ramp_time={ramp_time} 827runtime={run_time} 828rate_iops={rate_iops} 829""" 830 if "spdk" in self.mode: 831 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 832 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 833 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 834 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 835 else: 836 ioengine = self.ioengine 837 spdk_conf = "" 838 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 839 "|", "awk", "'{print $1}'"]) 840 subsystems = [x for x in out.split("\n") if "nvme" in x] 841 842 if self.cpus_allowed is not None: 843 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 844 cpus_num = 0 845 cpus = self.cpus_allowed.split(",") 846 for cpu in cpus: 847 if "-" in cpu: 848 a, b = cpu.split("-") 849 a = int(a) 850 b = int(b) 851 cpus_num += len(range(a, b)) 852 else: 853 cpus_num += 1 854 self.num_cores = cpus_num 855 threads = range(0, self.num_cores) 856 elif hasattr(self, 'num_cores'): 857 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 858 threads = range(0, int(self.num_cores)) 859 else: 860 self.num_cores = len(subsystems) 861 threads = range(0, len(subsystems)) 862 863 if "spdk" in self.mode: 864 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 865 else: 866 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 867 868 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 869 rw=rw, rwmixread=rwmixread, block_size=block_size, 870 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 871 872 # TODO: hipri disabled for now, as it causes fio errors: 873 # io_u error on file /dev/nvme2n1: Operation not supported 874 # See comment in KernelInitiator class, kernel_init_connect() function 875 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 876 fio_config = fio_config + """ 877fixedbufs=1 878registerfiles=1 879#hipri=1 880""" 881 if num_jobs: 882 fio_config = fio_config + "numjobs=%s \n" % num_jobs 883 if self.cpus_allowed is not None: 884 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 885 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 886 fio_config = fio_config + filename_section 887 888 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 889 if hasattr(self, "num_cores"): 890 fio_config_filename += "_%sCPU" % self.num_cores 891 fio_config_filename += ".fio" 892 893 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 894 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 895 self.log_print("Created FIO Config:") 896 self.log_print(fio_config) 897 898 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 899 900 def set_cpu_frequency(self): 901 if self.cpu_frequency is not None: 902 try: 903 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 904 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 905 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 906 except Exception: 907 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 908 sys.exit() 909 else: 910 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 911 912 def run_fio(self, fio_config_file, run_num=None): 913 job_name, _ = os.path.splitext(fio_config_file) 914 self.log_print("Starting FIO run for job: %s" % job_name) 915 self.log_print("Using FIO: %s" % self.fio_bin) 916 917 if run_num: 918 for i in range(1, run_num + 1): 919 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 920 try: 921 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 922 "--output=%s" % output_filename, "--eta=never"], True) 923 self.log_print(output) 924 except subprocess.CalledProcessError as e: 925 self.log_print("ERROR: Fio process failed!") 926 self.log_print(e.stdout) 927 else: 928 output_filename = job_name + "_" + self.name + ".json" 929 output = self.exec_cmd(["sudo", self.fio_bin, 930 fio_config_file, "--output-format=json", 931 "--output" % output_filename], True) 932 self.log_print(output) 933 self.log_print("FIO run finished. Results in: %s" % output_filename) 934 935 def sys_config(self): 936 self.log_print("====Kernel release:====") 937 self.log_print(self.exec_cmd(["uname", "-r"])) 938 self.log_print("====Kernel command line:====") 939 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 940 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 941 self.log_print("====sysctl conf:====") 942 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 943 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 944 self.log_print("====Cpu power info:====") 945 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 946 947 948class KernelTarget(Target): 949 def __init__(self, name, general_config, target_config): 950 super(KernelTarget, self).__init__(name, general_config, target_config) 951 # Defaults 952 self.nvmet_bin = "nvmetcli" 953 954 if "nvmet_bin" in target_config: 955 self.nvmet_bin = target_config["nvmet_bin"] 956 957 def __del__(self): 958 nvmet_command(self.nvmet_bin, "clear") 959 960 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 961 962 nvmet_cfg = { 963 "ports": [], 964 "hosts": [], 965 "subsystems": [], 966 } 967 968 # Split disks between NIC IP's 969 disks_per_ip = int(len(nvme_list) / len(address_list)) 970 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 971 972 subsys_no = 1 973 port_no = 0 974 for ip, chunk in zip(address_list, disk_chunks): 975 for disk in chunk: 976 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 977 nvmet_cfg["subsystems"].append({ 978 "allowed_hosts": [], 979 "attr": { 980 "allow_any_host": "1", 981 "serial": "SPDK00%s" % subsys_no, 982 "version": "1.3" 983 }, 984 "namespaces": [ 985 { 986 "device": { 987 "path": disk, 988 "uuid": "%s" % uuid.uuid4() 989 }, 990 "enable": 1, 991 "nsid": subsys_no 992 } 993 ], 994 "nqn": nqn 995 }) 996 997 nvmet_cfg["ports"].append({ 998 "addr": { 999 "adrfam": "ipv4", 1000 "traddr": ip, 1001 "trsvcid": "%s" % (4420 + port_no), 1002 "trtype": "%s" % self.transport 1003 }, 1004 "portid": subsys_no, 1005 "referrals": [], 1006 "subsystems": [nqn] 1007 }) 1008 subsys_no += 1 1009 port_no += 1 1010 self.subsystem_info_list.append([port_no, nqn, ip]) 1011 1012 with open("kernel.conf", "w") as fh: 1013 fh.write(json.dumps(nvmet_cfg, indent=2)) 1014 pass 1015 1016 def tgt_start(self): 1017 self.log_print("Configuring kernel NVMeOF Target") 1018 1019 if self.null_block: 1020 print("Configuring with null block device.") 1021 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1022 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 1023 self.subsys_no = len(null_blk_list) 1024 else: 1025 print("Configuring with NVMe drives.") 1026 nvme_list = get_nvme_devices() 1027 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1028 self.subsys_no = len(nvme_list) 1029 1030 nvmet_command(self.nvmet_bin, "clear") 1031 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1032 1033 if self.enable_adq: 1034 self.adq_configure_tc() 1035 1036 self.log_print("Done configuring kernel NVMeOF Target") 1037 1038 1039class SPDKTarget(Target): 1040 def __init__(self, name, general_config, target_config): 1041 super(SPDKTarget, self).__init__(name, general_config, target_config) 1042 1043 # Required fields 1044 self.core_mask = target_config["core_mask"] 1045 self.num_cores = self.get_num_cores(self.core_mask) 1046 1047 # Defaults 1048 self.dif_insert_strip = False 1049 self.null_block_dif_type = 0 1050 self.num_shared_buffers = 4096 1051 self.bpf_proc = None 1052 self.bpf_scripts = [] 1053 1054 if "num_shared_buffers" in target_config: 1055 self.num_shared_buffers = target_config["num_shared_buffers"] 1056 if "null_block_dif_type" in target_config: 1057 self.null_block_dif_type = target_config["null_block_dif_type"] 1058 if "dif_insert_strip" in target_config: 1059 self.dif_insert_strip = target_config["dif_insert_strip"] 1060 if "bpf_scripts" in target_config: 1061 self.bpf_scripts = target_config["bpf_scripts"] 1062 1063 def get_num_cores(self, core_mask): 1064 if "0x" in core_mask: 1065 return bin(int(core_mask, 16)).count("1") 1066 else: 1067 num_cores = 0 1068 core_mask = core_mask.replace("[", "") 1069 core_mask = core_mask.replace("]", "") 1070 for i in core_mask.split(","): 1071 if "-" in i: 1072 x, y = i.split("-") 1073 num_cores += len(range(int(x), int(y))) + 1 1074 else: 1075 num_cores += 1 1076 return num_cores 1077 1078 def spdk_tgt_configure(self): 1079 self.log_print("Configuring SPDK NVMeOF target via RPC") 1080 1081 if self.enable_adq: 1082 self.adq_configure_tc() 1083 1084 # Create RDMA transport layer 1085 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1086 num_shared_buffers=self.num_shared_buffers, 1087 dif_insert_or_strip=self.dif_insert_strip, 1088 sock_priority=self.adq_priority) 1089 self.log_print("SPDK NVMeOF transport layer:") 1090 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1091 1092 if self.null_block: 1093 self.spdk_tgt_add_nullblock(self.null_block) 1094 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1095 else: 1096 self.spdk_tgt_add_nvme_conf() 1097 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1098 1099 self.log_print("Done configuring SPDK NVMeOF Target") 1100 1101 def spdk_tgt_add_nullblock(self, null_block_count): 1102 md_size = 0 1103 block_size = 4096 1104 if self.null_block_dif_type != 0: 1105 md_size = 128 1106 1107 self.log_print("Adding null block bdevices to config via RPC") 1108 for i in range(null_block_count): 1109 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1110 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1111 dif_type=self.null_block_dif_type, md_size=md_size) 1112 self.log_print("SPDK Bdevs configuration:") 1113 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1114 1115 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1116 self.log_print("Adding NVMe bdevs to config via RPC") 1117 1118 bdfs = get_nvme_devices_bdf() 1119 bdfs = [b.replace(":", ".") for b in bdfs] 1120 1121 if req_num_disks: 1122 if req_num_disks > len(bdfs): 1123 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1124 sys.exit(1) 1125 else: 1126 bdfs = bdfs[0:req_num_disks] 1127 1128 for i, bdf in enumerate(bdfs): 1129 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1130 1131 self.log_print("SPDK Bdevs configuration:") 1132 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1133 1134 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1135 self.log_print("Adding subsystems to config") 1136 port = "4420" 1137 if not req_num_disks: 1138 req_num_disks = get_nvme_devices_count() 1139 1140 # Distribute bdevs between provided NICs 1141 num_disks = range(0, req_num_disks) 1142 if len(num_disks) == 1: 1143 disks_per_ip = 1 1144 else: 1145 disks_per_ip = int(len(num_disks) / len(ips)) 1146 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))] 1147 1148 # Create subsystems, add bdevs to namespaces, add listeners 1149 for ip, chunk in zip(ips, disk_chunks): 1150 for c in chunk: 1151 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1152 serial = "SPDK00%s" % c 1153 bdev_name = "Nvme%sn1" % c 1154 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1155 allow_any_host=True, max_namespaces=8) 1156 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1157 1158 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1159 nqn=nqn, 1160 trtype=self.transport, 1161 traddr=ip, 1162 trsvcid=port, 1163 adrfam="ipv4") 1164 1165 self.subsystem_info_list.append([port, nqn, ip]) 1166 self.log_print("SPDK NVMeOF subsystem configuration:") 1167 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1168 1169 def bpf_start(self): 1170 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1171 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1172 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1173 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1174 1175 with open(self.pid, "r") as fh: 1176 nvmf_pid = str(fh.readline()) 1177 1178 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1179 self.log_print(cmd) 1180 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1181 1182 def tgt_start(self): 1183 if self.null_block: 1184 self.subsys_no = 1 1185 else: 1186 self.subsys_no = get_nvme_devices_count() 1187 self.log_print("Starting SPDK NVMeOF Target process") 1188 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1189 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1190 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1191 1192 with open(self.pid, "w") as fh: 1193 fh.write(str(proc.pid)) 1194 self.nvmf_proc = proc 1195 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1196 self.log_print("Waiting for spdk to initilize...") 1197 while True: 1198 if os.path.exists("/var/tmp/spdk.sock"): 1199 break 1200 time.sleep(1) 1201 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1202 1203 if self.enable_zcopy: 1204 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1205 enable_zerocopy_send_server=True) 1206 self.log_print("Target socket options:") 1207 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1208 1209 if self.enable_adq: 1210 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1211 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1212 nvme_adminq_poll_period_us=100000, retry_count=4) 1213 rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000) 1214 1215 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1216 1217 rpc.framework_start_init(self.client) 1218 1219 if self.bpf_scripts: 1220 self.bpf_start() 1221 1222 self.spdk_tgt_configure() 1223 1224 def __del__(self): 1225 if self.bpf_proc: 1226 self.log_print("Stopping BPF Trace script") 1227 self.bpf_proc.terminate() 1228 self.bpf_proc.wait() 1229 1230 if hasattr(self, "nvmf_proc"): 1231 try: 1232 self.nvmf_proc.terminate() 1233 self.nvmf_proc.wait() 1234 except Exception as e: 1235 self.log_print(e) 1236 self.nvmf_proc.kill() 1237 self.nvmf_proc.communicate() 1238 1239 1240class KernelInitiator(Initiator): 1241 def __init__(self, name, general_config, initiator_config): 1242 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1243 1244 # Defaults 1245 self.extra_params = "" 1246 self.ioengine = "libaio" 1247 1248 if "extra_params" in initiator_config: 1249 self.extra_params = initiator_config["extra_params"] 1250 1251 if "kernel_engine" in initiator_config: 1252 self.ioengine = initiator_config["kernel_engine"] 1253 if "io_uring" in self.ioengine: 1254 self.extra_params = "--nr-poll-queues=8" 1255 1256 def __del__(self): 1257 self.ssh_connection.close() 1258 1259 def get_connected_nvme_list(self): 1260 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1261 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1262 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1263 return nvme_list 1264 1265 def kernel_init_connect(self): 1266 self.log_print("Below connection attempts may result in error messages, this is expected!") 1267 for subsystem in self.subsystem_info_list: 1268 self.log_print("Trying to connect %s %s %s" % subsystem) 1269 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1270 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1271 time.sleep(2) 1272 1273 if "io_uring" in self.ioengine: 1274 self.log_print("Setting block layer settings for io_uring.") 1275 1276 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1277 # apparently it's not possible for connected subsystems. 1278 # Results in "error: Invalid argument" 1279 block_sysfs_settings = { 1280 "iostats": "0", 1281 "rq_affinity": "0", 1282 "nomerges": "2" 1283 } 1284 1285 for disk in self.get_connected_nvme_list(): 1286 sysfs = os.path.join("/sys/block", disk, "queue") 1287 for k, v in block_sysfs_settings.items(): 1288 sysfs_opt_path = os.path.join(sysfs, k) 1289 try: 1290 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1291 except subprocess.CalledProcessError as e: 1292 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1293 finally: 1294 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1295 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1296 1297 def kernel_init_disconnect(self): 1298 for subsystem in self.subsystem_info_list: 1299 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1300 time.sleep(1) 1301 1302 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1303 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1304 1305 filename_section = "" 1306 nvme_per_split = int(len(nvme_list) / len(threads)) 1307 remainder = len(nvme_list) % len(threads) 1308 iterator = iter(nvme_list) 1309 result = [] 1310 for i in range(len(threads)): 1311 result.append([]) 1312 for _ in range(nvme_per_split): 1313 result[i].append(next(iterator)) 1314 if remainder: 1315 result[i].append(next(iterator)) 1316 remainder -= 1 1317 for i, r in enumerate(result): 1318 header = "[filename%s]" % i 1319 disks = "\n".join(["filename=%s" % x for x in r]) 1320 job_section_qd = round((io_depth * len(r)) / num_jobs) 1321 if job_section_qd == 0: 1322 job_section_qd = 1 1323 iodepth = "iodepth=%s" % job_section_qd 1324 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1325 1326 return filename_section 1327 1328 1329class SPDKInitiator(Initiator): 1330 def __init__(self, name, general_config, initiator_config): 1331 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1332 1333 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1334 self.install_spdk() 1335 1336 # Required fields 1337 self.num_cores = initiator_config["num_cores"] 1338 1339 def install_spdk(self): 1340 self.log_print("Using fio binary %s" % self.fio_bin) 1341 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1342 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1343 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1344 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1345 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1346 1347 self.log_print("SPDK built") 1348 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1349 1350 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1351 bdev_cfg_section = { 1352 "subsystems": [ 1353 { 1354 "subsystem": "bdev", 1355 "config": [] 1356 } 1357 ] 1358 } 1359 1360 for i, subsys in enumerate(remote_subsystem_list): 1361 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1362 nvme_ctrl = { 1363 "method": "bdev_nvme_attach_controller", 1364 "params": { 1365 "name": "Nvme{}".format(i), 1366 "trtype": self.transport, 1367 "traddr": sub_addr, 1368 "trsvcid": sub_port, 1369 "subnqn": sub_nqn, 1370 "adrfam": "IPv4" 1371 } 1372 } 1373 1374 if self.enable_adq: 1375 nvme_ctrl["params"].update({"priority": "1"}) 1376 1377 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1378 1379 return json.dumps(bdev_cfg_section, indent=2) 1380 1381 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1382 filename_section = "" 1383 if len(threads) >= len(subsystems): 1384 threads = range(0, len(subsystems)) 1385 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1386 nvme_per_split = int(len(subsystems) / len(threads)) 1387 remainder = len(subsystems) % len(threads) 1388 iterator = iter(filenames) 1389 result = [] 1390 for i in range(len(threads)): 1391 result.append([]) 1392 for _ in range(nvme_per_split): 1393 result[i].append(next(iterator)) 1394 if remainder: 1395 result[i].append(next(iterator)) 1396 remainder -= 1 1397 for i, r in enumerate(result): 1398 header = "[filename%s]" % i 1399 disks = "\n".join(["filename=%s" % x for x in r]) 1400 job_section_qd = round((io_depth * len(r)) / num_jobs) 1401 if job_section_qd == 0: 1402 job_section_qd = 1 1403 iodepth = "iodepth=%s" % job_section_qd 1404 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1405 1406 return filename_section 1407 1408 1409if __name__ == "__main__": 1410 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1411 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1412 1413 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1414 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1415 help='Configuration file.') 1416 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1417 help='Results directory.') 1418 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1419 help='CSV results filename.') 1420 1421 args = parser.parse_args() 1422 1423 print("Using config file: %s" % args.config) 1424 with open(args.config, "r") as config: 1425 data = json.load(config) 1426 1427 initiators = [] 1428 fio_cases = [] 1429 1430 general_config = data["general"] 1431 target_config = data["target"] 1432 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1433 1434 for k, v in data.items(): 1435 if "target" in k: 1436 v.update({"results_dir": args.results}) 1437 if data[k]["mode"] == "spdk": 1438 target_obj = SPDKTarget(k, data["general"], v) 1439 elif data[k]["mode"] == "kernel": 1440 target_obj = KernelTarget(k, data["general"], v) 1441 pass 1442 elif "initiator" in k: 1443 if data[k]["mode"] == "spdk": 1444 init_obj = SPDKInitiator(k, data["general"], v) 1445 elif data[k]["mode"] == "kernel": 1446 init_obj = KernelInitiator(k, data["general"], v) 1447 initiators.append(init_obj) 1448 elif "fio" in k: 1449 fio_workloads = itertools.product(data[k]["bs"], 1450 data[k]["qd"], 1451 data[k]["rw"]) 1452 1453 fio_run_time = data[k]["run_time"] 1454 fio_ramp_time = data[k]["ramp_time"] 1455 fio_rw_mix_read = data[k]["rwmixread"] 1456 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1457 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1458 1459 fio_rate_iops = 0 1460 if "rate_iops" in data[k]: 1461 fio_rate_iops = data[k]["rate_iops"] 1462 else: 1463 continue 1464 1465 try: 1466 os.mkdir(args.results) 1467 except FileExistsError: 1468 pass 1469 1470 target_obj.tgt_start() 1471 1472 for i in initiators: 1473 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1474 if i.enable_adq: 1475 i.adq_configure_tc() 1476 1477 # Poor mans threading 1478 # Run FIO tests 1479 for block_size, io_depth, rw in fio_workloads: 1480 threads = [] 1481 configs = [] 1482 for i in initiators: 1483 if i.mode == "kernel": 1484 i.kernel_init_connect() 1485 1486 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1487 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1488 configs.append(cfg) 1489 1490 for i, cfg in zip(initiators, configs): 1491 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1492 threads.append(t) 1493 if target_obj.enable_sar: 1494 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1495 sar_file_name = ".".join([sar_file_name, "txt"]) 1496 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1497 threads.append(t) 1498 1499 if target_obj.enable_pcm: 1500 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1501 1502 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1503 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1504 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1505 1506 threads.append(pcm_cpu_t) 1507 threads.append(pcm_mem_t) 1508 threads.append(pcm_pow_t) 1509 1510 if target_obj.enable_bandwidth: 1511 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1512 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1513 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1514 threads.append(t) 1515 1516 if target_obj.enable_dpdk_memory: 1517 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1518 threads.append(t) 1519 1520 if target_obj.enable_adq: 1521 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time)) 1522 threads.append(ethtool_thread) 1523 1524 for t in threads: 1525 t.start() 1526 for t in threads: 1527 t.join() 1528 1529 for i in initiators: 1530 if i.mode == "kernel": 1531 i.kernel_init_disconnect() 1532 i.copy_result_files(args.results) 1533 1534 target_obj.restore_governor() 1535 target_obj.restore_tuned() 1536 target_obj.restore_services() 1537 target_obj.restore_sysctl() 1538 for i in initiators: 1539 i.restore_governor() 1540 i.restore_tuned() 1541 i.restore_services() 1542 i.restore_sysctl() 1543 target_obj.parse_results(args.results, args.csv_filename) 1544