1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20 21import rpc 22import rpc.client 23from common import * 24 25 26class Server: 27 def __init__(self, name, general_config, server_config): 28 self.name = name 29 self.username = general_config["username"] 30 self.password = general_config["password"] 31 self.transport = general_config["transport"].lower() 32 self.nic_ips = server_config["nic_ips"] 33 self.mode = server_config["mode"] 34 35 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 36 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 37 self.irq_scripts_dir = server_config["irq_scripts_dir"] 38 39 self.local_nic_info = [] 40 self._nics_json_obj = {} 41 self.svc_restore_dict = {} 42 self.sysctl_restore_dict = {} 43 self.tuned_restore_dict = {} 44 self.governor_restore = "" 45 self.tuned_profile = "" 46 47 self.enable_adq = False 48 self.adq_priority = None 49 if "adq_enable" in server_config and server_config["adq_enable"]: 50 self.enable_adq = server_config["adq_enable"] 51 self.adq_priority = 1 52 53 if "tuned_profile" in server_config: 54 self.tuned_profile = server_config["tuned_profile"] 55 56 if not re.match("^[A-Za-z0-9]*$", name): 57 self.log_print("Please use a name which contains only letters or numbers") 58 sys.exit(1) 59 60 def log_print(self, msg): 61 print("[%s] %s" % (self.name, msg), flush=True) 62 63 def get_uncommented_lines(self, lines): 64 return [line for line in lines if line and not line.startswith('#')] 65 66 def get_nic_name_by_ip(self, ip): 67 if not self._nics_json_obj: 68 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 69 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 70 for nic in self._nics_json_obj: 71 for addr in nic["addr_info"]: 72 if ip in addr["local"]: 73 return nic["ifname"] 74 75 def set_local_nic_info_helper(self): 76 pass 77 78 def set_local_nic_info(self, pci_info): 79 def extract_network_elements(json_obj): 80 nic_list = [] 81 if isinstance(json_obj, list): 82 for x in json_obj: 83 nic_list.extend(extract_network_elements(x)) 84 elif isinstance(json_obj, dict): 85 if "children" in json_obj: 86 nic_list.extend(extract_network_elements(json_obj["children"])) 87 if "class" in json_obj.keys() and "network" in json_obj["class"]: 88 nic_list.append(json_obj) 89 return nic_list 90 91 self.local_nic_info = extract_network_elements(pci_info) 92 93 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 94 return "" 95 96 def configure_system(self): 97 self.configure_services() 98 self.configure_sysctl() 99 self.configure_tuned() 100 self.configure_cpu_governor() 101 self.configure_irq_affinity() 102 103 def configure_adq(self): 104 if self.mode == "kernel": 105 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 106 return 107 self.adq_load_modules() 108 self.adq_configure_nic() 109 110 def adq_load_modules(self): 111 self.log_print("Modprobing ADQ-related Linux modules...") 112 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 113 for module in adq_module_deps: 114 try: 115 self.exec_cmd(["sudo", "modprobe", module]) 116 self.log_print("%s loaded!" % module) 117 except CalledProcessError as e: 118 self.log_print("ERROR: failed to load module %s" % module) 119 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 120 121 def adq_configure_tc(self): 122 self.log_print("Configuring ADQ Traffic classes and filters...") 123 124 if self.mode == "kernel": 125 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 126 return 127 128 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 129 num_queues_tc1 = self.num_cores 130 port_param = "dst_port" if isinstance(self, Target) else "src_port" 131 port = "4420" 132 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 133 134 for nic_ip in self.nic_ips: 135 nic_name = self.get_nic_name_by_ip(nic_ip) 136 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 137 "root", "mqprio", "num_tc", "2", "map", "0", "1", 138 "queues", "%s@0" % num_queues_tc0, 139 "%s@%s" % (num_queues_tc1, num_queues_tc0), 140 "hw", "1", "mode", "channel"] 141 self.log_print(" ".join(tc_qdisc_map_cmd)) 142 self.exec_cmd(tc_qdisc_map_cmd) 143 144 time.sleep(5) 145 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 146 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 147 self.exec_cmd(tc_qdisc_ingress_cmd) 148 149 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 150 "protocol", "ip", "ingress", "prio", "1", "flower", 151 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 152 "skip_sw", "hw_tc", "1"] 153 self.log_print(" ".join(tc_filter_cmd)) 154 self.exec_cmd(tc_filter_cmd) 155 156 # show tc configuration 157 self.log_print("Show tc configuration for %s NIC..." % nic_name) 158 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 159 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 160 self.log_print("%s" % tc_disk_out) 161 self.log_print("%s" % tc_filter_out) 162 163 # Ethtool coalesce settings must be applied after configuring traffic classes 164 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 165 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 166 167 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 168 xps_cmd = ["sudo", xps_script_path, nic_name] 169 self.log_print(xps_cmd) 170 self.exec_cmd(xps_cmd) 171 172 def adq_configure_nic(self): 173 self.log_print("Configuring NIC port settings for ADQ testing...") 174 175 # Reload the driver first, to make sure any previous settings are re-set. 176 try: 177 self.exec_cmd(["sudo", "rmmod", "ice"]) 178 self.exec_cmd(["sudo", "modprobe", "ice"]) 179 except CalledProcessError as e: 180 self.log_print("ERROR: failed to reload ice module!") 181 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 182 183 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 184 for nic in nic_names: 185 self.log_print(nic) 186 try: 187 self.exec_cmd(["sudo", "ethtool", "-K", nic, 188 "hw-tc-offload", "on"]) # Enable hardware TC offload 189 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 190 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 191 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 192 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 193 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 194 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 195 "channel-pkt-inspect-optimize", "on"]) 196 except CalledProcessError as e: 197 self.log_print("ERROR: failed to configure NIC port using ethtool!") 198 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 199 self.log_print("Please update your NIC driver and firmware versions and try again.") 200 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 201 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 202 203 def configure_services(self): 204 self.log_print("Configuring active services...") 205 svc_config = configparser.ConfigParser(strict=False) 206 207 # Below list is valid only for RHEL / Fedora systems and might not 208 # contain valid names for other distributions. 209 svc_target_state = { 210 "firewalld": "inactive", 211 "irqbalance": "inactive", 212 "lldpad.service": "inactive", 213 "lldpad.socket": "inactive" 214 } 215 216 for service in svc_target_state: 217 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 218 out = "\n".join(["[%s]" % service, out]) 219 svc_config.read_string(out) 220 221 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 222 continue 223 224 service_state = svc_config[service]["ActiveState"] 225 self.log_print("Current state of %s service is %s" % (service, service_state)) 226 self.svc_restore_dict.update({service: service_state}) 227 if service_state != "inactive": 228 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 229 self.exec_cmd(["sudo", "systemctl", "stop", service]) 230 231 def configure_sysctl(self): 232 self.log_print("Tuning sysctl settings...") 233 234 busy_read = 0 235 if self.enable_adq and self.mode == "spdk": 236 busy_read = 1 237 238 sysctl_opts = { 239 "net.core.busy_poll": 0, 240 "net.core.busy_read": busy_read, 241 "net.core.somaxconn": 4096, 242 "net.core.netdev_max_backlog": 8192, 243 "net.ipv4.tcp_max_syn_backlog": 16384, 244 "net.core.rmem_max": 268435456, 245 "net.core.wmem_max": 268435456, 246 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 247 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 248 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 249 "net.ipv4.route.flush": 1, 250 "vm.overcommit_memory": 1, 251 } 252 253 for opt, value in sysctl_opts.items(): 254 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 255 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 256 257 def configure_tuned(self): 258 if not self.tuned_profile: 259 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 260 return 261 262 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 263 service = "tuned" 264 tuned_config = configparser.ConfigParser(strict=False) 265 266 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 267 out = "\n".join(["[%s]" % service, out]) 268 tuned_config.read_string(out) 269 tuned_state = tuned_config[service]["ActiveState"] 270 self.svc_restore_dict.update({service: tuned_state}) 271 272 if tuned_state != "inactive": 273 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 274 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 275 276 self.tuned_restore_dict = { 277 "profile": profile, 278 "mode": profile_mode 279 } 280 281 self.exec_cmd(["sudo", "systemctl", "start", service]) 282 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 283 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 284 285 def configure_cpu_governor(self): 286 self.log_print("Setting CPU governor to performance...") 287 288 # This assumes that there is the same CPU scaling governor on each CPU 289 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 290 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 291 292 def configure_irq_affinity(self): 293 self.log_print("Setting NIC irq affinity for NICs...") 294 295 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 296 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 297 for nic in nic_names: 298 irq_cmd = ["sudo", irq_script_path, nic] 299 self.log_print(irq_cmd) 300 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 301 302 def restore_services(self): 303 self.log_print("Restoring services...") 304 for service, state in self.svc_restore_dict.items(): 305 cmd = "stop" if state == "inactive" else "start" 306 self.exec_cmd(["sudo", "systemctl", cmd, service]) 307 308 def restore_sysctl(self): 309 self.log_print("Restoring sysctl settings...") 310 for opt, value in self.sysctl_restore_dict.items(): 311 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 312 313 def restore_tuned(self): 314 self.log_print("Restoring tuned-adm settings...") 315 316 if not self.tuned_restore_dict: 317 return 318 319 if self.tuned_restore_dict["mode"] == "auto": 320 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 321 self.log_print("Reverted tuned-adm to auto_profile.") 322 else: 323 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 324 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 325 326 def restore_governor(self): 327 self.log_print("Restoring CPU governor setting...") 328 if self.governor_restore: 329 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 330 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 331 332 333class Target(Server): 334 def __init__(self, name, general_config, target_config): 335 super(Target, self).__init__(name, general_config, target_config) 336 337 # Defaults 338 self.enable_sar = False 339 self.sar_delay = 0 340 self.sar_interval = 0 341 self.sar_count = 0 342 self.enable_pcm = False 343 self.pcm_dir = "" 344 self.pcm_delay = 0 345 self.pcm_interval = 0 346 self.pcm_count = 0 347 self.enable_bandwidth = 0 348 self.bandwidth_count = 0 349 self.enable_dpdk_memory = False 350 self.dpdk_wait_time = 0 351 self.enable_zcopy = False 352 self.scheduler_name = "static" 353 self.null_block = 0 354 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 355 self.subsystem_info_list = [] 356 357 if "null_block_devices" in target_config: 358 self.null_block = target_config["null_block_devices"] 359 if "sar_settings" in target_config: 360 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 361 if "pcm_settings" in target_config: 362 self.enable_pcm = True 363 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 364 if "enable_bandwidth" in target_config: 365 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 366 if "enable_dpdk_memory" in target_config: 367 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 368 if "scheduler_settings" in target_config: 369 self.scheduler_name = target_config["scheduler_settings"] 370 if "zcopy_settings" in target_config: 371 self.enable_zcopy = target_config["zcopy_settings"] 372 if "results_dir" in target_config: 373 self.results_dir = target_config["results_dir"] 374 375 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 376 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 377 self.set_local_nic_info(self.set_local_nic_info_helper()) 378 379 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 380 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 381 382 self.configure_system() 383 if self.enable_adq: 384 self.configure_adq() 385 self.sys_config() 386 387 def set_local_nic_info_helper(self): 388 return json.loads(self.exec_cmd(["lshw", "-json"])) 389 390 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 391 stderr_opt = None 392 if stderr_redirect: 393 stderr_opt = subprocess.STDOUT 394 if change_dir: 395 old_cwd = os.getcwd() 396 os.chdir(change_dir) 397 self.log_print("Changing directory to %s" % change_dir) 398 399 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 400 401 if change_dir: 402 os.chdir(old_cwd) 403 self.log_print("Changing directory to %s" % old_cwd) 404 return out 405 406 def zip_spdk_sources(self, spdk_dir, dest_file): 407 self.log_print("Zipping SPDK source directory") 408 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 409 for root, directories, files in os.walk(spdk_dir, followlinks=True): 410 for file in files: 411 fh.write(os.path.relpath(os.path.join(root, file))) 412 fh.close() 413 self.log_print("Done zipping") 414 415 def read_json_stats(self, file): 416 with open(file, "r") as json_data: 417 data = json.load(json_data) 418 job_pos = 0 # job_post = 0 because using aggregated results 419 420 # Check if latency is in nano or microseconds to choose correct dict key 421 def get_lat_unit(key_prefix, dict_section): 422 # key prefix - lat, clat or slat. 423 # dict section - portion of json containing latency bucket in question 424 # Return dict key to access the bucket and unit as string 425 for k, _ in dict_section.items(): 426 if k.startswith(key_prefix): 427 return k, k.split("_")[1] 428 429 def get_clat_percentiles(clat_dict_leaf): 430 if "percentile" in clat_dict_leaf: 431 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 432 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 433 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 434 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 435 436 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 437 else: 438 # Latest fio versions do not provide "percentile" results if no 439 # measurements were done, so just return zeroes 440 return [0, 0, 0, 0] 441 442 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 443 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 444 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 445 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 446 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 447 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 448 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 449 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 450 data["jobs"][job_pos]["read"][clat_key]) 451 452 if "ns" in lat_unit: 453 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 454 if "ns" in clat_unit: 455 read_p99_lat = read_p99_lat / 1000 456 read_p99_9_lat = read_p99_9_lat / 1000 457 read_p99_99_lat = read_p99_99_lat / 1000 458 read_p99_999_lat = read_p99_999_lat / 1000 459 460 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 461 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 462 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 463 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 464 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 465 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 466 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 467 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 468 data["jobs"][job_pos]["write"][clat_key]) 469 470 if "ns" in lat_unit: 471 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 472 if "ns" in clat_unit: 473 write_p99_lat = write_p99_lat / 1000 474 write_p99_9_lat = write_p99_9_lat / 1000 475 write_p99_99_lat = write_p99_99_lat / 1000 476 write_p99_999_lat = write_p99_999_lat / 1000 477 478 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 479 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 480 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 481 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 482 483 def parse_results(self, results_dir, csv_file): 484 files = os.listdir(results_dir) 485 fio_files = filter(lambda x: ".fio" in x, files) 486 json_files = [x for x in files if ".json" in x] 487 488 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 489 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 490 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 491 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 492 493 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 494 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 495 496 header_line = ",".join(["Name", *headers]) 497 aggr_header_line = ",".join(["Name", *aggr_headers]) 498 499 # Create empty results file 500 with open(os.path.join(results_dir, csv_file), "w") as fh: 501 fh.write(aggr_header_line + "\n") 502 rows = set() 503 504 for fio_config in fio_files: 505 self.log_print("Getting FIO stats for %s" % fio_config) 506 job_name, _ = os.path.splitext(fio_config) 507 508 # Look in the filename for rwmixread value. Function arguments do 509 # not have that information. 510 # TODO: Improve this function by directly using workload params instead 511 # of regexing through filenames. 512 if "read" in job_name: 513 rw_mixread = 1 514 elif "write" in job_name: 515 rw_mixread = 0 516 else: 517 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 518 519 # If "_CPU" exists in name - ignore it 520 # Initiators for the same job could have different num_cores parameter 521 job_name = re.sub(r"_\d+CPU", "", job_name) 522 job_result_files = [x for x in json_files if x.startswith(job_name)] 523 self.log_print("Matching result files for current fio config:") 524 for j in job_result_files: 525 self.log_print("\t %s" % j) 526 527 # There may have been more than 1 initiator used in test, need to check that 528 # Result files are created so that string after last "_" separator is server name 529 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 530 inits_avg_results = [] 531 for i in inits_names: 532 self.log_print("\tGetting stats for initiator %s" % i) 533 # There may have been more than 1 test run for this job, calculate average results for initiator 534 i_results = [x for x in job_result_files if i in x] 535 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 536 537 separate_stats = [] 538 for r in i_results: 539 try: 540 stats = self.read_json_stats(os.path.join(results_dir, r)) 541 separate_stats.append(stats) 542 self.log_print(stats) 543 except JSONDecodeError as e: 544 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!") 545 546 init_results = [sum(x) for x in zip(*separate_stats)] 547 init_results = [x / len(separate_stats) for x in init_results] 548 inits_avg_results.append(init_results) 549 550 self.log_print("\tAverage results for initiator %s" % i) 551 self.log_print(init_results) 552 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 553 fh.write(header_line + "\n") 554 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 555 556 # Sum results of all initiators running this FIO job. 557 # Latency results are an average of latencies from accros all initiators. 558 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 559 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 560 for key in inits_avg_results: 561 if "lat" in key: 562 inits_avg_results[key] /= len(inits_names) 563 564 # Aggregate separate read/write values into common labels 565 # Take rw_mixread into consideration for mixed read/write workloads. 566 aggregate_results = OrderedDict() 567 for h in aggr_headers: 568 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 569 if "lat" in h: 570 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 571 else: 572 _ = read_stat + write_stat 573 aggregate_results[h] = "{0:.3f}".format(_) 574 575 rows.add(",".join([job_name, *aggregate_results.values()])) 576 577 # Save results to file 578 for row in rows: 579 with open(os.path.join(results_dir, csv_file), "a") as fh: 580 fh.write(row + "\n") 581 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 582 583 def measure_sar(self, results_dir, sar_file_name): 584 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 585 cpu_number = os.cpu_count() 586 sar_idle_sum = 0 587 time.sleep(self.sar_delay) 588 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 589 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 590 for line in out.split("\n"): 591 if "Average" in line: 592 if "CPU" in line: 593 self.log_print("Summary CPU utilization from SAR:") 594 self.log_print(line) 595 elif "all" in line: 596 self.log_print(line) 597 else: 598 sar_idle_sum += float(line.split()[7]) 599 fh.write(out) 600 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 601 with open(os.path.join(results_dir, sar_file_name), "a") as f: 602 f.write("Total CPU used: " + str(sar_cpu_usage)) 603 604 def ethtool_after_fio_ramp(self, fio_ramp_time): 605 time.sleep(fio_ramp_time//2) 606 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 607 for nic in nic_names: 608 self.log_print(nic) 609 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 610 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 611 612 def measure_pcm_memory(self, results_dir, pcm_file_name): 613 time.sleep(self.pcm_delay) 614 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 615 pcm_memory = subprocess.Popen(cmd) 616 time.sleep(self.pcm_count) 617 pcm_memory.terminate() 618 619 def measure_pcm(self, results_dir, pcm_file_name): 620 time.sleep(self.pcm_delay) 621 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 622 subprocess.run(cmd) 623 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 624 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 625 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 626 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 627 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 628 629 def measure_pcm_power(self, results_dir, pcm_power_file_name): 630 time.sleep(self.pcm_delay) 631 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 632 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 633 fh.write(out) 634 635 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 636 self.log_print("INFO: starting network bandwidth measure") 637 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 638 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 639 640 def measure_dpdk_memory(self, results_dir): 641 self.log_print("INFO: waiting to generate DPDK memory usage") 642 time.sleep(self.dpdk_wait_time) 643 self.log_print("INFO: generating DPDK memory usage") 644 rpc.env.env_dpdk_get_mem_stats 645 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 646 647 def sys_config(self): 648 self.log_print("====Kernel release:====") 649 self.log_print(os.uname().release) 650 self.log_print("====Kernel command line:====") 651 with open('/proc/cmdline') as f: 652 cmdline = f.readlines() 653 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 654 self.log_print("====sysctl conf:====") 655 with open('/etc/sysctl.conf') as f: 656 sysctl = f.readlines() 657 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 658 self.log_print("====Cpu power info:====") 659 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 660 self.log_print("====zcopy settings:====") 661 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 662 self.log_print("====Scheduler settings:====") 663 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 664 665 666class Initiator(Server): 667 def __init__(self, name, general_config, initiator_config): 668 super(Initiator, self).__init__(name, general_config, initiator_config) 669 670 # Required fields 671 self.ip = initiator_config["ip"] 672 self.target_nic_ips = initiator_config["target_nic_ips"] 673 674 # Defaults 675 self.cpus_allowed = None 676 self.cpus_allowed_policy = "shared" 677 self.spdk_dir = "/tmp/spdk" 678 self.fio_bin = "/usr/src/fio/fio" 679 self.nvmecli_bin = "nvme" 680 self.cpu_frequency = None 681 self.subsystem_info_list = [] 682 683 if "spdk_dir" in initiator_config: 684 self.spdk_dir = initiator_config["spdk_dir"] 685 if "fio_bin" in initiator_config: 686 self.fio_bin = initiator_config["fio_bin"] 687 if "nvmecli_bin" in initiator_config: 688 self.nvmecli_bin = initiator_config["nvmecli_bin"] 689 if "cpus_allowed" in initiator_config: 690 self.cpus_allowed = initiator_config["cpus_allowed"] 691 if "cpus_allowed_policy" in initiator_config: 692 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 693 if "cpu_frequency" in initiator_config: 694 self.cpu_frequency = initiator_config["cpu_frequency"] 695 if os.getenv('SPDK_WORKSPACE'): 696 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 697 698 self.ssh_connection = paramiko.SSHClient() 699 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 700 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 701 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 702 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 703 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 704 705 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 706 self.copy_spdk("/tmp/spdk.zip") 707 self.set_local_nic_info(self.set_local_nic_info_helper()) 708 self.set_cpu_frequency() 709 self.configure_system() 710 if self.enable_adq: 711 self.configure_adq() 712 self.sys_config() 713 714 def set_local_nic_info_helper(self): 715 return json.loads(self.exec_cmd(["lshw", "-json"])) 716 717 def stop(self): 718 self.ssh_connection.close() 719 720 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 721 if change_dir: 722 cmd = ["cd", change_dir, ";", *cmd] 723 724 # In case one of the command elements contains whitespace and is not 725 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 726 # when sending to remote system. 727 for i, c in enumerate(cmd): 728 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 729 cmd[i] = '"%s"' % c 730 cmd = " ".join(cmd) 731 732 # Redirect stderr to stdout thanks using get_pty option if needed 733 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 734 out = stdout.read().decode(encoding="utf-8") 735 736 # Check the return code 737 rc = stdout.channel.recv_exit_status() 738 if rc: 739 raise CalledProcessError(int(rc), cmd, out) 740 741 return out 742 743 def put_file(self, local, remote_dest): 744 ftp = self.ssh_connection.open_sftp() 745 ftp.put(local, remote_dest) 746 ftp.close() 747 748 def get_file(self, remote, local_dest): 749 ftp = self.ssh_connection.open_sftp() 750 ftp.get(remote, local_dest) 751 ftp.close() 752 753 def copy_spdk(self, local_spdk_zip): 754 self.log_print("Copying SPDK sources to initiator %s" % self.name) 755 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 756 self.log_print("Copied sources zip from target") 757 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 758 self.log_print("Sources unpacked") 759 760 def copy_result_files(self, dest_dir): 761 self.log_print("Copying results") 762 763 if not os.path.exists(dest_dir): 764 os.mkdir(dest_dir) 765 766 # Get list of result files from initiator and copy them back to target 767 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 768 769 for file in file_list: 770 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 771 os.path.join(dest_dir, file)) 772 self.log_print("Done copying results") 773 774 def discover_subsystems(self, address_list, subsys_no): 775 num_nvmes = range(0, subsys_no) 776 nvme_discover_output = "" 777 for ip, subsys_no in itertools.product(address_list, num_nvmes): 778 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 779 nvme_discover_cmd = ["sudo", 780 "%s" % self.nvmecli_bin, 781 "discover", "-t", "%s" % self.transport, 782 "-s", "%s" % (4420 + subsys_no), 783 "-a", "%s" % ip] 784 785 try: 786 stdout = self.exec_cmd(nvme_discover_cmd) 787 if stdout: 788 nvme_discover_output = nvme_discover_output + stdout 789 except CalledProcessError: 790 # Do nothing. In case of discovering remote subsystems of kernel target 791 # we expect "nvme discover" to fail a bunch of times because we basically 792 # scan ports. 793 pass 794 795 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 796 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 797 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 798 nvme_discover_output) # from nvme discovery output 799 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 800 subsystems = list(set(subsystems)) 801 subsystems.sort(key=lambda x: x[1]) 802 self.log_print("Found matching subsystems on target side:") 803 for s in subsystems: 804 self.log_print(s) 805 self.subsystem_info_list = subsystems 806 807 def gen_fio_filename_conf(self, *args, **kwargs): 808 # Logic implemented in SPDKInitiator and KernelInitiator classes 809 pass 810 811 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 812 fio_conf_template = """ 813[global] 814ioengine={ioengine} 815{spdk_conf} 816thread=1 817group_reporting=1 818direct=1 819percentile_list=50:90:99:99.5:99.9:99.99:99.999 820 821norandommap=1 822rw={rw} 823rwmixread={rwmixread} 824bs={block_size} 825time_based=1 826ramp_time={ramp_time} 827runtime={run_time} 828rate_iops={rate_iops} 829""" 830 if "spdk" in self.mode: 831 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 832 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 833 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 834 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 835 else: 836 ioengine = self.ioengine 837 spdk_conf = "" 838 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 839 "|", "awk", "'{print $1}'"]) 840 subsystems = [x for x in out.split("\n") if "nvme" in x] 841 842 if self.cpus_allowed is not None: 843 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 844 cpus_num = 0 845 cpus = self.cpus_allowed.split(",") 846 for cpu in cpus: 847 if "-" in cpu: 848 a, b = cpu.split("-") 849 a = int(a) 850 b = int(b) 851 cpus_num += len(range(a, b)) 852 else: 853 cpus_num += 1 854 self.num_cores = cpus_num 855 threads = range(0, self.num_cores) 856 elif hasattr(self, 'num_cores'): 857 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 858 threads = range(0, int(self.num_cores)) 859 else: 860 self.num_cores = len(subsystems) 861 threads = range(0, len(subsystems)) 862 863 if "spdk" in self.mode: 864 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 865 else: 866 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 867 868 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 869 rw=rw, rwmixread=rwmixread, block_size=block_size, 870 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 871 872 # TODO: hipri disabled for now, as it causes fio errors: 873 # io_u error on file /dev/nvme2n1: Operation not supported 874 # See comment in KernelInitiator class, kernel_init_connect() function 875 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 876 fio_config = fio_config + """ 877fixedbufs=1 878registerfiles=1 879#hipri=1 880""" 881 if num_jobs: 882 fio_config = fio_config + "numjobs=%s \n" % num_jobs 883 if self.cpus_allowed is not None: 884 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 885 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 886 fio_config = fio_config + filename_section 887 888 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 889 if hasattr(self, "num_cores"): 890 fio_config_filename += "_%sCPU" % self.num_cores 891 fio_config_filename += ".fio" 892 893 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 894 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 895 self.log_print("Created FIO Config:") 896 self.log_print(fio_config) 897 898 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 899 900 def set_cpu_frequency(self): 901 if self.cpu_frequency is not None: 902 try: 903 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 904 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 905 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 906 except Exception: 907 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 908 sys.exit() 909 else: 910 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 911 912 def run_fio(self, fio_config_file, run_num=None): 913 job_name, _ = os.path.splitext(fio_config_file) 914 self.log_print("Starting FIO run for job: %s" % job_name) 915 self.log_print("Using FIO: %s" % self.fio_bin) 916 917 if run_num: 918 for i in range(1, run_num + 1): 919 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 920 try: 921 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 922 "--output=%s" % output_filename, "--eta=never"], True) 923 self.log_print(output) 924 except subprocess.CalledProcessError as e: 925 self.log_print("ERROR: Fio process failed!") 926 self.log_print(e.stdout) 927 else: 928 output_filename = job_name + "_" + self.name + ".json" 929 output = self.exec_cmd(["sudo", self.fio_bin, 930 fio_config_file, "--output-format=json", 931 "--output" % output_filename], True) 932 self.log_print(output) 933 self.log_print("FIO run finished. Results in: %s" % output_filename) 934 935 def sys_config(self): 936 self.log_print("====Kernel release:====") 937 self.log_print(self.exec_cmd(["uname", "-r"])) 938 self.log_print("====Kernel command line:====") 939 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 940 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 941 self.log_print("====sysctl conf:====") 942 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 943 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 944 self.log_print("====Cpu power info:====") 945 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 946 947 948class KernelTarget(Target): 949 def __init__(self, name, general_config, target_config): 950 super(KernelTarget, self).__init__(name, general_config, target_config) 951 # Defaults 952 self.nvmet_bin = "nvmetcli" 953 954 if "nvmet_bin" in target_config: 955 self.nvmet_bin = target_config["nvmet_bin"] 956 957 def stop(self): 958 nvmet_command(self.nvmet_bin, "clear") 959 960 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 961 962 nvmet_cfg = { 963 "ports": [], 964 "hosts": [], 965 "subsystems": [], 966 } 967 968 # Split disks between NIC IP's 969 disks_per_ip = int(len(nvme_list) / len(address_list)) 970 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 971 972 # Add remaining drives 973 for i, disk in enumerate(nvme_list[disks_per_ip * len(address_list):]): 974 disks_chunks[i].append(disk) 975 976 subsys_no = 1 977 port_no = 0 978 for ip, chunk in zip(address_list, disk_chunks): 979 for disk in chunk: 980 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 981 nvmet_cfg["subsystems"].append({ 982 "allowed_hosts": [], 983 "attr": { 984 "allow_any_host": "1", 985 "serial": "SPDK00%s" % subsys_no, 986 "version": "1.3" 987 }, 988 "namespaces": [ 989 { 990 "device": { 991 "path": disk, 992 "uuid": "%s" % uuid.uuid4() 993 }, 994 "enable": 1, 995 "nsid": subsys_no 996 } 997 ], 998 "nqn": nqn 999 }) 1000 1001 nvmet_cfg["ports"].append({ 1002 "addr": { 1003 "adrfam": "ipv4", 1004 "traddr": ip, 1005 "trsvcid": "%s" % (4420 + port_no), 1006 "trtype": "%s" % self.transport 1007 }, 1008 "portid": subsys_no, 1009 "referrals": [], 1010 "subsystems": [nqn] 1011 }) 1012 subsys_no += 1 1013 port_no += 1 1014 self.subsystem_info_list.append([port_no, nqn, ip]) 1015 1016 with open("kernel.conf", "w") as fh: 1017 fh.write(json.dumps(nvmet_cfg, indent=2)) 1018 pass 1019 1020 def tgt_start(self): 1021 self.log_print("Configuring kernel NVMeOF Target") 1022 1023 if self.null_block: 1024 print("Configuring with null block device.") 1025 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1026 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 1027 self.subsys_no = len(null_blk_list) 1028 else: 1029 print("Configuring with NVMe drives.") 1030 nvme_list = get_nvme_devices() 1031 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1032 self.subsys_no = len(nvme_list) 1033 1034 nvmet_command(self.nvmet_bin, "clear") 1035 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1036 1037 if self.enable_adq: 1038 self.adq_configure_tc() 1039 1040 self.log_print("Done configuring kernel NVMeOF Target") 1041 1042 1043class SPDKTarget(Target): 1044 def __init__(self, name, general_config, target_config): 1045 super(SPDKTarget, self).__init__(name, general_config, target_config) 1046 1047 # Required fields 1048 self.core_mask = target_config["core_mask"] 1049 self.num_cores = self.get_num_cores(self.core_mask) 1050 1051 # Defaults 1052 self.dif_insert_strip = False 1053 self.null_block_dif_type = 0 1054 self.num_shared_buffers = 4096 1055 self.max_queue_depth = 128 1056 self.bpf_proc = None 1057 self.bpf_scripts = [] 1058 self.enable_idxd = False 1059 1060 if "num_shared_buffers" in target_config: 1061 self.num_shared_buffers = target_config["num_shared_buffers"] 1062 if "max_queue_depth" in target_config: 1063 self.max_queue_depth = target_config["max_queue_depth"] 1064 if "null_block_dif_type" in target_config: 1065 self.null_block_dif_type = target_config["null_block_dif_type"] 1066 if "dif_insert_strip" in target_config: 1067 self.dif_insert_strip = target_config["dif_insert_strip"] 1068 if "bpf_scripts" in target_config: 1069 self.bpf_scripts = target_config["bpf_scripts"] 1070 if "idxd_settings" in target_config: 1071 self.enable_idxd = target_config["idxd_settings"] 1072 1073 self.log_print("====IDXD settings:====") 1074 self.log_print("IDXD enabled: %s" % (self.enable_idxd)) 1075 1076 def get_num_cores(self, core_mask): 1077 if "0x" in core_mask: 1078 return bin(int(core_mask, 16)).count("1") 1079 else: 1080 num_cores = 0 1081 core_mask = core_mask.replace("[", "") 1082 core_mask = core_mask.replace("]", "") 1083 for i in core_mask.split(","): 1084 if "-" in i: 1085 x, y = i.split("-") 1086 num_cores += len(range(int(x), int(y))) + 1 1087 else: 1088 num_cores += 1 1089 return num_cores 1090 1091 def spdk_tgt_configure(self): 1092 self.log_print("Configuring SPDK NVMeOF target via RPC") 1093 1094 if self.enable_adq: 1095 self.adq_configure_tc() 1096 1097 # Create transport layer 1098 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1099 num_shared_buffers=self.num_shared_buffers, 1100 max_queue_depth=self.max_queue_depth, 1101 dif_insert_or_strip=self.dif_insert_strip, 1102 sock_priority=self.adq_priority) 1103 self.log_print("SPDK NVMeOF transport layer:") 1104 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1105 1106 if self.null_block: 1107 self.spdk_tgt_add_nullblock(self.null_block) 1108 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1109 else: 1110 self.spdk_tgt_add_nvme_conf() 1111 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1112 1113 self.log_print("Done configuring SPDK NVMeOF Target") 1114 1115 def spdk_tgt_add_nullblock(self, null_block_count): 1116 md_size = 0 1117 block_size = 4096 1118 if self.null_block_dif_type != 0: 1119 md_size = 128 1120 1121 self.log_print("Adding null block bdevices to config via RPC") 1122 for i in range(null_block_count): 1123 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1124 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1125 dif_type=self.null_block_dif_type, md_size=md_size) 1126 self.log_print("SPDK Bdevs configuration:") 1127 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1128 1129 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1130 self.log_print("Adding NVMe bdevs to config via RPC") 1131 1132 bdfs = get_nvme_devices_bdf() 1133 bdfs = [b.replace(":", ".") for b in bdfs] 1134 1135 if req_num_disks: 1136 if req_num_disks > len(bdfs): 1137 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1138 sys.exit(1) 1139 else: 1140 bdfs = bdfs[0:req_num_disks] 1141 1142 for i, bdf in enumerate(bdfs): 1143 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1144 1145 self.log_print("SPDK Bdevs configuration:") 1146 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1147 1148 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1149 self.log_print("Adding subsystems to config") 1150 port = "4420" 1151 if not req_num_disks: 1152 req_num_disks = get_nvme_devices_count() 1153 1154 # Distribute bdevs between provided NICs 1155 num_disks = range(0, req_num_disks) 1156 if len(num_disks) == 1: 1157 disks_per_ip = 1 1158 else: 1159 disks_per_ip = int(len(num_disks) / len(ips)) 1160 disk_chunks = [[*num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i]] for i in range(0, len(ips))] 1161 1162 # Add remaining drives 1163 for i, disk in enumerate(num_disks[disks_per_ip * len(ips):]): 1164 disk_chunks[i].append(disk) 1165 1166 # Create subsystems, add bdevs to namespaces, add listeners 1167 for ip, chunk in zip(ips, disk_chunks): 1168 for c in chunk: 1169 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1170 serial = "SPDK00%s" % c 1171 bdev_name = "Nvme%sn1" % c 1172 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1173 allow_any_host=True, max_namespaces=8) 1174 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1175 1176 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1177 nqn=nqn, 1178 trtype=self.transport, 1179 traddr=ip, 1180 trsvcid=port, 1181 adrfam="ipv4") 1182 1183 self.subsystem_info_list.append([port, nqn, ip]) 1184 self.log_print("SPDK NVMeOF subsystem configuration:") 1185 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1186 1187 def bpf_start(self): 1188 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1189 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1190 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1191 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1192 1193 with open(self.pid, "r") as fh: 1194 nvmf_pid = str(fh.readline()) 1195 1196 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1197 self.log_print(cmd) 1198 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1199 1200 def tgt_start(self): 1201 if self.null_block: 1202 self.subsys_no = 1 1203 else: 1204 self.subsys_no = get_nvme_devices_count() 1205 self.log_print("Starting SPDK NVMeOF Target process") 1206 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1207 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1208 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1209 1210 with open(self.pid, "w") as fh: 1211 fh.write(str(proc.pid)) 1212 self.nvmf_proc = proc 1213 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1214 self.log_print("Waiting for spdk to initialize...") 1215 while True: 1216 if os.path.exists("/var/tmp/spdk.sock"): 1217 break 1218 time.sleep(1) 1219 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock") 1220 1221 if self.enable_zcopy: 1222 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1223 enable_zerocopy_send_server=True) 1224 self.log_print("Target socket options:") 1225 rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1226 1227 if self.enable_adq: 1228 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1229 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1230 nvme_adminq_poll_period_us=100000, retry_count=4) 1231 1232 if self.enable_idxd: 1233 rpc.idxd.idxd_scan_accel_engine(self.client, config_number=0, config_kernel_mode=None) 1234 self.log_print("Target IDXD accel engine enabled") 1235 1236 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1237 rpc.framework_start_init(self.client) 1238 1239 if self.bpf_scripts: 1240 self.bpf_start() 1241 1242 self.spdk_tgt_configure() 1243 1244 def stop(self): 1245 if self.bpf_proc: 1246 self.log_print("Stopping BPF Trace script") 1247 self.bpf_proc.terminate() 1248 self.bpf_proc.wait() 1249 1250 if hasattr(self, "nvmf_proc"): 1251 try: 1252 self.nvmf_proc.terminate() 1253 self.nvmf_proc.wait() 1254 except Exception as e: 1255 self.log_print(e) 1256 self.nvmf_proc.kill() 1257 self.nvmf_proc.communicate() 1258 1259 1260class KernelInitiator(Initiator): 1261 def __init__(self, name, general_config, initiator_config): 1262 super(KernelInitiator, self).__init__(name, general_config, initiator_config) 1263 1264 # Defaults 1265 self.extra_params = "" 1266 self.ioengine = "libaio" 1267 1268 if "extra_params" in initiator_config: 1269 self.extra_params = initiator_config["extra_params"] 1270 1271 if "kernel_engine" in initiator_config: 1272 self.ioengine = initiator_config["kernel_engine"] 1273 if "io_uring" in self.ioengine: 1274 self.extra_params = "--nr-poll-queues=8" 1275 1276 def get_connected_nvme_list(self): 1277 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1278 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1279 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1280 return nvme_list 1281 1282 def kernel_init_connect(self): 1283 self.log_print("Below connection attempts may result in error messages, this is expected!") 1284 for subsystem in self.subsystem_info_list: 1285 self.log_print("Trying to connect %s %s %s" % subsystem) 1286 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1287 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1288 time.sleep(2) 1289 1290 if "io_uring" in self.ioengine: 1291 self.log_print("Setting block layer settings for io_uring.") 1292 1293 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1294 # apparently it's not possible for connected subsystems. 1295 # Results in "error: Invalid argument" 1296 block_sysfs_settings = { 1297 "iostats": "0", 1298 "rq_affinity": "0", 1299 "nomerges": "2" 1300 } 1301 1302 for disk in self.get_connected_nvme_list(): 1303 sysfs = os.path.join("/sys/block", disk, "queue") 1304 for k, v in block_sysfs_settings.items(): 1305 sysfs_opt_path = os.path.join(sysfs, k) 1306 try: 1307 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1308 except subprocess.CalledProcessError as e: 1309 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1310 finally: 1311 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1312 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1313 1314 def kernel_init_disconnect(self): 1315 for subsystem in self.subsystem_info_list: 1316 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1317 time.sleep(1) 1318 1319 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1320 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1321 1322 filename_section = "" 1323 nvme_per_split = int(len(nvme_list) / len(threads)) 1324 remainder = len(nvme_list) % len(threads) 1325 iterator = iter(nvme_list) 1326 result = [] 1327 for i in range(len(threads)): 1328 result.append([]) 1329 for _ in range(nvme_per_split): 1330 result[i].append(next(iterator)) 1331 if remainder: 1332 result[i].append(next(iterator)) 1333 remainder -= 1 1334 for i, r in enumerate(result): 1335 header = "[filename%s]" % i 1336 disks = "\n".join(["filename=%s" % x for x in r]) 1337 job_section_qd = round((io_depth * len(r)) / num_jobs) 1338 if job_section_qd == 0: 1339 job_section_qd = 1 1340 iodepth = "iodepth=%s" % job_section_qd 1341 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1342 1343 return filename_section 1344 1345 1346class SPDKInitiator(Initiator): 1347 def __init__(self, name, general_config, initiator_config): 1348 super(SPDKInitiator, self).__init__(name, general_config, initiator_config) 1349 1350 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1351 self.install_spdk() 1352 1353 # Required fields 1354 self.num_cores = initiator_config["num_cores"] 1355 1356 # Optional fields 1357 self.enable_data_digest = False 1358 if "enable_data_digest" in initiator_config: 1359 self.enable_data_digest = initiator_config["enable_data_digest"] 1360 1361 def install_spdk(self): 1362 self.log_print("Using fio binary %s" % self.fio_bin) 1363 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1364 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1365 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1366 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1367 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1368 1369 self.log_print("SPDK built") 1370 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1371 1372 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1373 bdev_cfg_section = { 1374 "subsystems": [ 1375 { 1376 "subsystem": "bdev", 1377 "config": [] 1378 } 1379 ] 1380 } 1381 1382 for i, subsys in enumerate(remote_subsystem_list): 1383 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1384 nvme_ctrl = { 1385 "method": "bdev_nvme_attach_controller", 1386 "params": { 1387 "name": "Nvme{}".format(i), 1388 "trtype": self.transport, 1389 "traddr": sub_addr, 1390 "trsvcid": sub_port, 1391 "subnqn": sub_nqn, 1392 "adrfam": "IPv4" 1393 } 1394 } 1395 1396 if self.enable_adq: 1397 nvme_ctrl["params"].update({"priority": "1"}) 1398 1399 if self.enable_data_digest: 1400 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1401 1402 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1403 1404 return json.dumps(bdev_cfg_section, indent=2) 1405 1406 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1407 filename_section = "" 1408 if len(threads) >= len(subsystems): 1409 threads = range(0, len(subsystems)) 1410 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1411 nvme_per_split = int(len(subsystems) / len(threads)) 1412 remainder = len(subsystems) % len(threads) 1413 iterator = iter(filenames) 1414 result = [] 1415 for i in range(len(threads)): 1416 result.append([]) 1417 for _ in range(nvme_per_split): 1418 result[i].append(next(iterator)) 1419 if remainder: 1420 result[i].append(next(iterator)) 1421 remainder -= 1 1422 for i, r in enumerate(result): 1423 header = "[filename%s]" % i 1424 disks = "\n".join(["filename=%s" % x for x in r]) 1425 job_section_qd = round((io_depth * len(r)) / num_jobs) 1426 if job_section_qd == 0: 1427 job_section_qd = 1 1428 iodepth = "iodepth=%s" % job_section_qd 1429 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1430 1431 return filename_section 1432 1433 1434if __name__ == "__main__": 1435 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1436 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1437 1438 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1439 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1440 help='Configuration file.') 1441 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1442 help='Results directory.') 1443 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1444 help='CSV results filename.') 1445 1446 args = parser.parse_args() 1447 1448 print("Using config file: %s" % args.config) 1449 with open(args.config, "r") as config: 1450 data = json.load(config) 1451 1452 initiators = [] 1453 fio_cases = [] 1454 1455 general_config = data["general"] 1456 target_config = data["target"] 1457 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1458 1459 for k, v in data.items(): 1460 if "target" in k: 1461 v.update({"results_dir": args.results}) 1462 if data[k]["mode"] == "spdk": 1463 target_obj = SPDKTarget(k, data["general"], v) 1464 elif data[k]["mode"] == "kernel": 1465 target_obj = KernelTarget(k, data["general"], v) 1466 pass 1467 elif "initiator" in k: 1468 if data[k]["mode"] == "spdk": 1469 init_obj = SPDKInitiator(k, data["general"], v) 1470 elif data[k]["mode"] == "kernel": 1471 init_obj = KernelInitiator(k, data["general"], v) 1472 initiators.append(init_obj) 1473 elif "fio" in k: 1474 fio_workloads = itertools.product(data[k]["bs"], 1475 data[k]["qd"], 1476 data[k]["rw"]) 1477 1478 fio_run_time = data[k]["run_time"] 1479 fio_ramp_time = data[k]["ramp_time"] 1480 fio_rw_mix_read = data[k]["rwmixread"] 1481 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1482 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1483 1484 fio_rate_iops = 0 1485 if "rate_iops" in data[k]: 1486 fio_rate_iops = data[k]["rate_iops"] 1487 else: 1488 continue 1489 1490 try: 1491 os.mkdir(args.results) 1492 except FileExistsError: 1493 pass 1494 1495 # TODO: This try block is definietly too large. Need to break this up into separate 1496 # logical blocks to reduce size. 1497 try: 1498 target_obj.tgt_start() 1499 1500 for i in initiators: 1501 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1502 if i.enable_adq: 1503 i.adq_configure_tc() 1504 1505 # Poor mans threading 1506 # Run FIO tests 1507 for block_size, io_depth, rw in fio_workloads: 1508 threads = [] 1509 configs = [] 1510 for i in initiators: 1511 if i.mode == "kernel": 1512 i.kernel_init_connect() 1513 1514 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1515 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1516 configs.append(cfg) 1517 1518 for i, cfg in zip(initiators, configs): 1519 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1520 threads.append(t) 1521 if target_obj.enable_sar: 1522 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1523 sar_file_name = ".".join([sar_file_name, "txt"]) 1524 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1525 threads.append(t) 1526 1527 if target_obj.enable_pcm: 1528 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1529 1530 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1531 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1532 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1533 1534 threads.append(pcm_cpu_t) 1535 threads.append(pcm_mem_t) 1536 threads.append(pcm_pow_t) 1537 1538 if target_obj.enable_bandwidth: 1539 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1540 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1541 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1542 threads.append(t) 1543 1544 if target_obj.enable_dpdk_memory: 1545 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1546 threads.append(t) 1547 1548 if target_obj.enable_adq: 1549 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1550 threads.append(ethtool_thread) 1551 1552 for t in threads: 1553 t.start() 1554 for t in threads: 1555 t.join() 1556 1557 for i in initiators: 1558 if i.mode == "kernel": 1559 i.kernel_init_disconnect() 1560 i.copy_result_files(args.results) 1561 1562 target_obj.restore_governor() 1563 target_obj.restore_tuned() 1564 target_obj.restore_services() 1565 target_obj.restore_sysctl() 1566 for i in initiators: 1567 i.restore_governor() 1568 i.restore_tuned() 1569 i.restore_services() 1570 i.restore_sysctl() 1571 target_obj.parse_results(args.results, args.csv_filename) 1572 finally: 1573 for i in initiators: 1574 try: 1575 i.stop() 1576 except Exception as err: 1577 pass 1578 target_obj.stop() 1579