1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 # pylint: disable=R0201 97 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 98 return "" 99 100 def configure_system(self): 101 self.configure_services() 102 self.configure_sysctl() 103 self.configure_tuned() 104 self.configure_cpu_governor() 105 self.configure_irq_affinity() 106 107 def configure_adq(self): 108 if self.mode == "kernel": 109 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 110 return 111 self.adq_load_modules() 112 self.adq_configure_nic() 113 114 def adq_load_modules(self): 115 self.log_print("Modprobing ADQ-related Linux modules...") 116 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 117 for module in adq_module_deps: 118 try: 119 self.exec_cmd(["sudo", "modprobe", module]) 120 self.log_print("%s loaded!" % module) 121 except CalledProcessError as e: 122 self.log_print("ERROR: failed to load module %s" % module) 123 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 124 125 def adq_configure_tc(self): 126 self.log_print("Configuring ADQ Traffic classes and filters...") 127 128 if self.mode == "kernel": 129 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 130 return 131 132 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 133 num_queues_tc1 = self.num_cores 134 port_param = "dst_port" if isinstance(self, Target) else "src_port" 135 port = "4420" 136 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 137 138 for nic_ip in self.nic_ips: 139 nic_name = self.get_nic_name_by_ip(nic_ip) 140 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 141 "root", "mqprio", "num_tc", "2", "map", "0", "1", 142 "queues", "%s@0" % num_queues_tc0, 143 "%s@%s" % (num_queues_tc1, num_queues_tc0), 144 "hw", "1", "mode", "channel"] 145 self.log_print(" ".join(tc_qdisc_map_cmd)) 146 self.exec_cmd(tc_qdisc_map_cmd) 147 148 time.sleep(5) 149 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 150 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 151 self.exec_cmd(tc_qdisc_ingress_cmd) 152 153 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 154 "protocol", "ip", "ingress", "prio", "1", "flower", 155 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 156 "skip_sw", "hw_tc", "1"] 157 self.log_print(" ".join(tc_filter_cmd)) 158 self.exec_cmd(tc_filter_cmd) 159 160 # show tc configuration 161 self.log_print("Show tc configuration for %s NIC..." % nic_name) 162 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 163 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 164 self.log_print("%s" % tc_disk_out) 165 self.log_print("%s" % tc_filter_out) 166 167 # Ethtool coalesce settings must be applied after configuring traffic classes 168 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 169 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 170 171 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 172 xps_cmd = ["sudo", xps_script_path, nic_name] 173 self.log_print(xps_cmd) 174 self.exec_cmd(xps_cmd) 175 176 def adq_configure_nic(self): 177 self.log_print("Configuring NIC port settings for ADQ testing...") 178 179 # Reload the driver first, to make sure any previous settings are re-set. 180 try: 181 self.exec_cmd(["sudo", "rmmod", "ice"]) 182 self.exec_cmd(["sudo", "modprobe", "ice"]) 183 except CalledProcessError as e: 184 self.log_print("ERROR: failed to reload ice module!") 185 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 186 187 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 188 for nic in nic_names: 189 self.log_print(nic) 190 try: 191 self.exec_cmd(["sudo", "ethtool", "-K", nic, 192 "hw-tc-offload", "on"]) # Enable hardware TC offload 193 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 194 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 195 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 196 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 197 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 198 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 199 "channel-pkt-inspect-optimize", "on"]) 200 except CalledProcessError as e: 201 self.log_print("ERROR: failed to configure NIC port using ethtool!") 202 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 203 self.log_print("Please update your NIC driver and firmware versions and try again.") 204 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 205 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 206 207 def configure_services(self): 208 self.log_print("Configuring active services...") 209 svc_config = configparser.ConfigParser(strict=False) 210 211 # Below list is valid only for RHEL / Fedora systems and might not 212 # contain valid names for other distributions. 213 svc_target_state = { 214 "firewalld": "inactive", 215 "irqbalance": "inactive", 216 "lldpad.service": "inactive", 217 "lldpad.socket": "inactive" 218 } 219 220 for service in svc_target_state: 221 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 222 out = "\n".join(["[%s]" % service, out]) 223 svc_config.read_string(out) 224 225 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 226 continue 227 228 service_state = svc_config[service]["ActiveState"] 229 self.log_print("Current state of %s service is %s" % (service, service_state)) 230 self.svc_restore_dict.update({service: service_state}) 231 if service_state != "inactive": 232 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 233 self.exec_cmd(["sudo", "systemctl", "stop", service]) 234 235 def configure_sysctl(self): 236 self.log_print("Tuning sysctl settings...") 237 238 busy_read = 0 239 if self.enable_adq and self.mode == "spdk": 240 busy_read = 1 241 242 sysctl_opts = { 243 "net.core.busy_poll": 0, 244 "net.core.busy_read": busy_read, 245 "net.core.somaxconn": 4096, 246 "net.core.netdev_max_backlog": 8192, 247 "net.ipv4.tcp_max_syn_backlog": 16384, 248 "net.core.rmem_max": 268435456, 249 "net.core.wmem_max": 268435456, 250 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 251 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 252 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 253 "net.ipv4.route.flush": 1, 254 "vm.overcommit_memory": 1, 255 } 256 257 for opt, value in sysctl_opts.items(): 258 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 259 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 260 261 def configure_tuned(self): 262 if not self.tuned_profile: 263 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 264 return 265 266 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 267 service = "tuned" 268 tuned_config = configparser.ConfigParser(strict=False) 269 270 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 271 out = "\n".join(["[%s]" % service, out]) 272 tuned_config.read_string(out) 273 tuned_state = tuned_config[service]["ActiveState"] 274 self.svc_restore_dict.update({service: tuned_state}) 275 276 if tuned_state != "inactive": 277 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 278 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 279 280 self.tuned_restore_dict = { 281 "profile": profile, 282 "mode": profile_mode 283 } 284 285 self.exec_cmd(["sudo", "systemctl", "start", service]) 286 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 287 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 288 289 def configure_cpu_governor(self): 290 self.log_print("Setting CPU governor to performance...") 291 292 # This assumes that there is the same CPU scaling governor on each CPU 293 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 294 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 295 296 def configure_irq_affinity(self): 297 self.log_print("Setting NIC irq affinity for NICs...") 298 299 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 300 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 301 for nic in nic_names: 302 irq_cmd = ["sudo", irq_script_path, nic] 303 self.log_print(irq_cmd) 304 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 305 306 def restore_services(self): 307 self.log_print("Restoring services...") 308 for service, state in self.svc_restore_dict.items(): 309 cmd = "stop" if state == "inactive" else "start" 310 self.exec_cmd(["sudo", "systemctl", cmd, service]) 311 312 def restore_sysctl(self): 313 self.log_print("Restoring sysctl settings...") 314 for opt, value in self.sysctl_restore_dict.items(): 315 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 316 317 def restore_tuned(self): 318 self.log_print("Restoring tuned-adm settings...") 319 320 if not self.tuned_restore_dict: 321 return 322 323 if self.tuned_restore_dict["mode"] == "auto": 324 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 325 self.log_print("Reverted tuned-adm to auto_profile.") 326 else: 327 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 328 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 329 330 def restore_governor(self): 331 self.log_print("Restoring CPU governor setting...") 332 if self.governor_restore: 333 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 334 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 335 336 337class Target(Server): 338 def __init__(self, name, general_config, target_config): 339 super().__init__(name, general_config, target_config) 340 341 # Defaults 342 self.enable_sar = False 343 self.sar_delay = 0 344 self.sar_interval = 0 345 self.sar_count = 0 346 self.enable_pcm = False 347 self.pcm_dir = "" 348 self.pcm_delay = 0 349 self.pcm_interval = 0 350 self.pcm_count = 0 351 self.enable_bandwidth = 0 352 self.bandwidth_count = 0 353 self.enable_dpdk_memory = False 354 self.dpdk_wait_time = 0 355 self.enable_zcopy = False 356 self.scheduler_name = "static" 357 self.null_block = 0 358 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 359 self.subsystem_info_list = [] 360 361 if "null_block_devices" in target_config: 362 self.null_block = target_config["null_block_devices"] 363 if "sar_settings" in target_config: 364 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 365 if "pcm_settings" in target_config: 366 self.enable_pcm = True 367 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 368 if "enable_bandwidth" in target_config: 369 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 370 if "enable_dpdk_memory" in target_config: 371 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 372 if "scheduler_settings" in target_config: 373 self.scheduler_name = target_config["scheduler_settings"] 374 if "zcopy_settings" in target_config: 375 self.enable_zcopy = target_config["zcopy_settings"] 376 if "results_dir" in target_config: 377 self.results_dir = target_config["results_dir"] 378 379 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 380 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 381 self.set_local_nic_info(self.set_local_nic_info_helper()) 382 383 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 384 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 385 386 self.configure_system() 387 if self.enable_adq: 388 self.configure_adq() 389 self.sys_config() 390 391 def set_local_nic_info_helper(self): 392 return json.loads(self.exec_cmd(["lshw", "-json"])) 393 394 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 395 stderr_opt = None 396 if stderr_redirect: 397 stderr_opt = subprocess.STDOUT 398 if change_dir: 399 old_cwd = os.getcwd() 400 os.chdir(change_dir) 401 self.log_print("Changing directory to %s" % change_dir) 402 403 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 404 405 if change_dir: 406 os.chdir(old_cwd) 407 self.log_print("Changing directory to %s" % old_cwd) 408 return out 409 410 def zip_spdk_sources(self, spdk_dir, dest_file): 411 self.log_print("Zipping SPDK source directory") 412 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 413 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 414 for file in files: 415 fh.write(os.path.relpath(os.path.join(root, file))) 416 fh.close() 417 self.log_print("Done zipping") 418 419 @staticmethod 420 def read_json_stats(file): 421 with open(file, "r") as json_data: 422 data = json.load(json_data) 423 job_pos = 0 # job_post = 0 because using aggregated results 424 425 # Check if latency is in nano or microseconds to choose correct dict key 426 def get_lat_unit(key_prefix, dict_section): 427 # key prefix - lat, clat or slat. 428 # dict section - portion of json containing latency bucket in question 429 # Return dict key to access the bucket and unit as string 430 for k, _ in dict_section.items(): 431 if k.startswith(key_prefix): 432 return k, k.split("_")[1] 433 434 def get_clat_percentiles(clat_dict_leaf): 435 if "percentile" in clat_dict_leaf: 436 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 437 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 438 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 439 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 440 441 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 442 else: 443 # Latest fio versions do not provide "percentile" results if no 444 # measurements were done, so just return zeroes 445 return [0, 0, 0, 0] 446 447 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 448 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 449 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 450 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 451 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 452 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 453 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 454 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 455 data["jobs"][job_pos]["read"][clat_key]) 456 457 if "ns" in lat_unit: 458 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 459 if "ns" in clat_unit: 460 read_p99_lat = read_p99_lat / 1000 461 read_p99_9_lat = read_p99_9_lat / 1000 462 read_p99_99_lat = read_p99_99_lat / 1000 463 read_p99_999_lat = read_p99_999_lat / 1000 464 465 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 466 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 467 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 468 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 469 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 470 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 471 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 472 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 473 data["jobs"][job_pos]["write"][clat_key]) 474 475 if "ns" in lat_unit: 476 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 477 if "ns" in clat_unit: 478 write_p99_lat = write_p99_lat / 1000 479 write_p99_9_lat = write_p99_9_lat / 1000 480 write_p99_99_lat = write_p99_99_lat / 1000 481 write_p99_999_lat = write_p99_999_lat / 1000 482 483 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 484 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 485 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 486 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 487 488 def parse_results(self, results_dir, csv_file): 489 files = os.listdir(results_dir) 490 fio_files = filter(lambda x: ".fio" in x, files) 491 json_files = [x for x in files if ".json" in x] 492 493 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 494 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 495 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 496 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 497 498 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 499 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 500 501 header_line = ",".join(["Name", *headers]) 502 aggr_header_line = ",".join(["Name", *aggr_headers]) 503 504 # Create empty results file 505 with open(os.path.join(results_dir, csv_file), "w") as fh: 506 fh.write(aggr_header_line + "\n") 507 rows = set() 508 509 for fio_config in fio_files: 510 self.log_print("Getting FIO stats for %s" % fio_config) 511 job_name, _ = os.path.splitext(fio_config) 512 513 # Look in the filename for rwmixread value. Function arguments do 514 # not have that information. 515 # TODO: Improve this function by directly using workload params instead 516 # of regexing through filenames. 517 if "read" in job_name: 518 rw_mixread = 1 519 elif "write" in job_name: 520 rw_mixread = 0 521 else: 522 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 523 524 # If "_CPU" exists in name - ignore it 525 # Initiators for the same job could have different num_cores parameter 526 job_name = re.sub(r"_\d+CPU", "", job_name) 527 job_result_files = [x for x in json_files if x.startswith(job_name)] 528 self.log_print("Matching result files for current fio config:") 529 for j in job_result_files: 530 self.log_print("\t %s" % j) 531 532 # There may have been more than 1 initiator used in test, need to check that 533 # Result files are created so that string after last "_" separator is server name 534 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 535 inits_avg_results = [] 536 for i in inits_names: 537 self.log_print("\tGetting stats for initiator %s" % i) 538 # There may have been more than 1 test run for this job, calculate average results for initiator 539 i_results = [x for x in job_result_files if i in x] 540 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 541 542 separate_stats = [] 543 for r in i_results: 544 try: 545 stats = self.read_json_stats(os.path.join(results_dir, r)) 546 separate_stats.append(stats) 547 self.log_print(stats) 548 except JSONDecodeError: 549 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 550 551 init_results = [sum(x) for x in zip(*separate_stats)] 552 init_results = [x / len(separate_stats) for x in init_results] 553 inits_avg_results.append(init_results) 554 555 self.log_print("\tAverage results for initiator %s" % i) 556 self.log_print(init_results) 557 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 558 fh.write(header_line + "\n") 559 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 560 561 # Sum results of all initiators running this FIO job. 562 # Latency results are an average of latencies from accros all initiators. 563 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 564 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 565 for key in inits_avg_results: 566 if "lat" in key: 567 inits_avg_results[key] /= len(inits_names) 568 569 # Aggregate separate read/write values into common labels 570 # Take rw_mixread into consideration for mixed read/write workloads. 571 aggregate_results = OrderedDict() 572 for h in aggr_headers: 573 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 574 if "lat" in h: 575 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 576 else: 577 _ = read_stat + write_stat 578 aggregate_results[h] = "{0:.3f}".format(_) 579 580 rows.add(",".join([job_name, *aggregate_results.values()])) 581 582 # Save results to file 583 for row in rows: 584 with open(os.path.join(results_dir, csv_file), "a") as fh: 585 fh.write(row + "\n") 586 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 587 588 def measure_sar(self, results_dir, sar_file_name): 589 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay) 590 cpu_number = os.cpu_count() 591 sar_idle_sum = 0 592 time.sleep(self.sar_delay) 593 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 594 with open(os.path.join(results_dir, sar_file_name), "w") as fh: 595 for line in out.split("\n"): 596 if "Average" in line: 597 if "CPU" in line: 598 self.log_print("Summary CPU utilization from SAR:") 599 self.log_print(line) 600 elif "all" in line: 601 self.log_print(line) 602 else: 603 sar_idle_sum += float(line.split()[7]) 604 fh.write(out) 605 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 606 with open(os.path.join(results_dir, sar_file_name), "a") as f: 607 f.write("Total CPU used: " + str(sar_cpu_usage)) 608 609 def ethtool_after_fio_ramp(self, fio_ramp_time): 610 time.sleep(fio_ramp_time//2) 611 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 612 for nic in nic_names: 613 self.log_print(nic) 614 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 615 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 616 617 def measure_pcm_memory(self, results_dir, pcm_file_name): 618 time.sleep(self.pcm_delay) 619 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 620 pcm_memory = subprocess.Popen(cmd) 621 time.sleep(self.pcm_count) 622 pcm_memory.terminate() 623 624 def measure_pcm(self, results_dir, pcm_file_name): 625 time.sleep(self.pcm_delay) 626 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 627 subprocess.run(cmd) 628 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 629 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 630 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 631 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 632 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 633 634 def measure_pcm_power(self, results_dir, pcm_power_file_name): 635 time.sleep(self.pcm_delay) 636 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 637 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 638 fh.write(out) 639 640 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 641 self.log_print("INFO: starting network bandwidth measure") 642 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 643 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 644 645 def measure_dpdk_memory(self, results_dir): 646 self.log_print("INFO: waiting to generate DPDK memory usage") 647 time.sleep(self.dpdk_wait_time) 648 self.log_print("INFO: generating DPDK memory usage") 649 rpc.env.env_dpdk_get_mem_stats 650 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 651 652 def sys_config(self): 653 self.log_print("====Kernel release:====") 654 self.log_print(os.uname().release) 655 self.log_print("====Kernel command line:====") 656 with open('/proc/cmdline') as f: 657 cmdline = f.readlines() 658 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 659 self.log_print("====sysctl conf:====") 660 with open('/etc/sysctl.conf') as f: 661 sysctl = f.readlines() 662 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 663 self.log_print("====Cpu power info:====") 664 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 665 self.log_print("====zcopy settings:====") 666 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 667 self.log_print("====Scheduler settings:====") 668 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 669 670 671class Initiator(Server): 672 def __init__(self, name, general_config, initiator_config): 673 super().__init__(name, general_config, initiator_config) 674 675 # Required fields 676 self.ip = initiator_config["ip"] 677 self.target_nic_ips = initiator_config["target_nic_ips"] 678 679 # Defaults 680 self.cpus_allowed = None 681 self.cpus_allowed_policy = "shared" 682 self.spdk_dir = "/tmp/spdk" 683 self.fio_bin = "/usr/src/fio/fio" 684 self.nvmecli_bin = "nvme" 685 self.cpu_frequency = None 686 self.subsystem_info_list = [] 687 688 if "spdk_dir" in initiator_config: 689 self.spdk_dir = initiator_config["spdk_dir"] 690 if "fio_bin" in initiator_config: 691 self.fio_bin = initiator_config["fio_bin"] 692 if "nvmecli_bin" in initiator_config: 693 self.nvmecli_bin = initiator_config["nvmecli_bin"] 694 if "cpus_allowed" in initiator_config: 695 self.cpus_allowed = initiator_config["cpus_allowed"] 696 if "cpus_allowed_policy" in initiator_config: 697 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 698 if "cpu_frequency" in initiator_config: 699 self.cpu_frequency = initiator_config["cpu_frequency"] 700 if os.getenv('SPDK_WORKSPACE'): 701 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 702 703 self.ssh_connection = paramiko.SSHClient() 704 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 705 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 706 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 707 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 708 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 709 710 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 711 self.copy_spdk("/tmp/spdk.zip") 712 self.set_local_nic_info(self.set_local_nic_info_helper()) 713 self.set_cpu_frequency() 714 self.configure_system() 715 if self.enable_adq: 716 self.configure_adq() 717 self.sys_config() 718 719 def set_local_nic_info_helper(self): 720 return json.loads(self.exec_cmd(["lshw", "-json"])) 721 722 def stop(self): 723 self.ssh_connection.close() 724 725 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 726 if change_dir: 727 cmd = ["cd", change_dir, ";", *cmd] 728 729 # In case one of the command elements contains whitespace and is not 730 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 731 # when sending to remote system. 732 for i, c in enumerate(cmd): 733 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 734 cmd[i] = '"%s"' % c 735 cmd = " ".join(cmd) 736 737 # Redirect stderr to stdout thanks using get_pty option if needed 738 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 739 out = stdout.read().decode(encoding="utf-8") 740 741 # Check the return code 742 rc = stdout.channel.recv_exit_status() 743 if rc: 744 raise CalledProcessError(int(rc), cmd, out) 745 746 return out 747 748 def put_file(self, local, remote_dest): 749 ftp = self.ssh_connection.open_sftp() 750 ftp.put(local, remote_dest) 751 ftp.close() 752 753 def get_file(self, remote, local_dest): 754 ftp = self.ssh_connection.open_sftp() 755 ftp.get(remote, local_dest) 756 ftp.close() 757 758 def copy_spdk(self, local_spdk_zip): 759 self.log_print("Copying SPDK sources to initiator %s" % self.name) 760 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 761 self.log_print("Copied sources zip from target") 762 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 763 self.log_print("Sources unpacked") 764 765 def copy_result_files(self, dest_dir): 766 self.log_print("Copying results") 767 768 if not os.path.exists(dest_dir): 769 os.mkdir(dest_dir) 770 771 # Get list of result files from initiator and copy them back to target 772 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 773 774 for file in file_list: 775 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 776 os.path.join(dest_dir, file)) 777 self.log_print("Done copying results") 778 779 def discover_subsystems(self, address_list, subsys_no): 780 num_nvmes = range(0, subsys_no) 781 nvme_discover_output = "" 782 for ip, subsys_no in itertools.product(address_list, num_nvmes): 783 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 784 nvme_discover_cmd = ["sudo", 785 "%s" % self.nvmecli_bin, 786 "discover", "-t", "%s" % self.transport, 787 "-s", "%s" % (4420 + subsys_no), 788 "-a", "%s" % ip] 789 790 try: 791 stdout = self.exec_cmd(nvme_discover_cmd) 792 if stdout: 793 nvme_discover_output = nvme_discover_output + stdout 794 except CalledProcessError: 795 # Do nothing. In case of discovering remote subsystems of kernel target 796 # we expect "nvme discover" to fail a bunch of times because we basically 797 # scan ports. 798 pass 799 800 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 801 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 802 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 803 nvme_discover_output) # from nvme discovery output 804 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 805 subsystems = list(set(subsystems)) 806 subsystems.sort(key=lambda x: x[1]) 807 self.log_print("Found matching subsystems on target side:") 808 for s in subsystems: 809 self.log_print(s) 810 self.subsystem_info_list = subsystems 811 812 def gen_fio_filename_conf(self, *args, **kwargs): 813 # Logic implemented in SPDKInitiator and KernelInitiator classes 814 pass 815 816 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 817 fio_conf_template = """ 818[global] 819ioengine={ioengine} 820{spdk_conf} 821thread=1 822group_reporting=1 823direct=1 824percentile_list=50:90:99:99.5:99.9:99.99:99.999 825 826norandommap=1 827rw={rw} 828rwmixread={rwmixread} 829bs={block_size} 830time_based=1 831ramp_time={ramp_time} 832runtime={run_time} 833rate_iops={rate_iops} 834""" 835 if "spdk" in self.mode: 836 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 837 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 838 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 839 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 840 else: 841 ioengine = self.ioengine 842 spdk_conf = "" 843 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 844 "|", "awk", "'{print $1}'"]) 845 subsystems = [x for x in out.split("\n") if "nvme" in x] 846 847 if self.cpus_allowed is not None: 848 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 849 cpus_num = 0 850 cpus = self.cpus_allowed.split(",") 851 for cpu in cpus: 852 if "-" in cpu: 853 a, b = cpu.split("-") 854 a = int(a) 855 b = int(b) 856 cpus_num += len(range(a, b)) 857 else: 858 cpus_num += 1 859 self.num_cores = cpus_num 860 threads = range(0, self.num_cores) 861 elif hasattr(self, 'num_cores'): 862 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 863 threads = range(0, int(self.num_cores)) 864 else: 865 self.num_cores = len(subsystems) 866 threads = range(0, len(subsystems)) 867 868 if "spdk" in self.mode: 869 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 870 else: 871 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 872 873 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 874 rw=rw, rwmixread=rwmixread, block_size=block_size, 875 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 876 877 # TODO: hipri disabled for now, as it causes fio errors: 878 # io_u error on file /dev/nvme2n1: Operation not supported 879 # See comment in KernelInitiator class, kernel_init_connect() function 880 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 881 fio_config = fio_config + """ 882fixedbufs=1 883registerfiles=1 884#hipri=1 885""" 886 if num_jobs: 887 fio_config = fio_config + "numjobs=%s \n" % num_jobs 888 if self.cpus_allowed is not None: 889 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 890 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 891 fio_config = fio_config + filename_section 892 893 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 894 if hasattr(self, "num_cores"): 895 fio_config_filename += "_%sCPU" % self.num_cores 896 fio_config_filename += ".fio" 897 898 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 899 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 900 self.log_print("Created FIO Config:") 901 self.log_print(fio_config) 902 903 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 904 905 def set_cpu_frequency(self): 906 if self.cpu_frequency is not None: 907 try: 908 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 909 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 910 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 911 except Exception: 912 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 913 sys.exit() 914 else: 915 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 916 917 def run_fio(self, fio_config_file, run_num=None): 918 job_name, _ = os.path.splitext(fio_config_file) 919 self.log_print("Starting FIO run for job: %s" % job_name) 920 self.log_print("Using FIO: %s" % self.fio_bin) 921 922 if run_num: 923 for i in range(1, run_num + 1): 924 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 925 try: 926 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 927 "--output=%s" % output_filename, "--eta=never"], True) 928 self.log_print(output) 929 except subprocess.CalledProcessError as e: 930 self.log_print("ERROR: Fio process failed!") 931 self.log_print(e.stdout) 932 else: 933 output_filename = job_name + "_" + self.name + ".json" 934 output = self.exec_cmd(["sudo", self.fio_bin, 935 fio_config_file, "--output-format=json", 936 "--output" % output_filename], True) 937 self.log_print(output) 938 self.log_print("FIO run finished. Results in: %s" % output_filename) 939 940 def sys_config(self): 941 self.log_print("====Kernel release:====") 942 self.log_print(self.exec_cmd(["uname", "-r"])) 943 self.log_print("====Kernel command line:====") 944 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 945 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 946 self.log_print("====sysctl conf:====") 947 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 948 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 949 self.log_print("====Cpu power info:====") 950 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 951 952 953class KernelTarget(Target): 954 def __init__(self, name, general_config, target_config): 955 super().__init__(name, general_config, target_config) 956 # Defaults 957 self.nvmet_bin = "nvmetcli" 958 959 if "nvmet_bin" in target_config: 960 self.nvmet_bin = target_config["nvmet_bin"] 961 962 def stop(self): 963 nvmet_command(self.nvmet_bin, "clear") 964 965 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 966 967 nvmet_cfg = { 968 "ports": [], 969 "hosts": [], 970 "subsystems": [], 971 } 972 973 # Split disks between NIC IP's 974 disks_per_ip = int(len(nvme_list) / len(address_list)) 975 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 976 977 # Add remaining drives 978 for i, disk in enumerate(nvme_list[disks_per_ip * len(address_list):]): 979 disk_chunks[i].append(disk) 980 981 subsys_no = 1 982 port_no = 0 983 for ip, chunk in zip(address_list, disk_chunks): 984 for disk in chunk: 985 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 986 nvmet_cfg["subsystems"].append({ 987 "allowed_hosts": [], 988 "attr": { 989 "allow_any_host": "1", 990 "serial": "SPDK00%s" % subsys_no, 991 "version": "1.3" 992 }, 993 "namespaces": [ 994 { 995 "device": { 996 "path": disk, 997 "uuid": "%s" % uuid.uuid4() 998 }, 999 "enable": 1, 1000 "nsid": subsys_no 1001 } 1002 ], 1003 "nqn": nqn 1004 }) 1005 1006 nvmet_cfg["ports"].append({ 1007 "addr": { 1008 "adrfam": "ipv4", 1009 "traddr": ip, 1010 "trsvcid": "%s" % (4420 + port_no), 1011 "trtype": "%s" % self.transport 1012 }, 1013 "portid": subsys_no, 1014 "referrals": [], 1015 "subsystems": [nqn] 1016 }) 1017 subsys_no += 1 1018 port_no += 1 1019 self.subsystem_info_list.append([port_no, nqn, ip]) 1020 1021 with open("kernel.conf", "w") as fh: 1022 fh.write(json.dumps(nvmet_cfg, indent=2)) 1023 1024 def tgt_start(self): 1025 self.log_print("Configuring kernel NVMeOF Target") 1026 1027 if self.null_block: 1028 print("Configuring with null block device.") 1029 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1030 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 1031 self.subsys_no = len(null_blk_list) 1032 else: 1033 print("Configuring with NVMe drives.") 1034 nvme_list = get_nvme_devices() 1035 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1036 self.subsys_no = len(nvme_list) 1037 1038 nvmet_command(self.nvmet_bin, "clear") 1039 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1040 1041 if self.enable_adq: 1042 self.adq_configure_tc() 1043 1044 self.log_print("Done configuring kernel NVMeOF Target") 1045 1046 1047class SPDKTarget(Target): 1048 def __init__(self, name, general_config, target_config): 1049 super().__init__(name, general_config, target_config) 1050 1051 # Required fields 1052 self.core_mask = target_config["core_mask"] 1053 self.num_cores = self.get_num_cores(self.core_mask) 1054 1055 # Defaults 1056 self.dif_insert_strip = False 1057 self.null_block_dif_type = 0 1058 self.num_shared_buffers = 4096 1059 self.max_queue_depth = 128 1060 self.bpf_proc = None 1061 self.bpf_scripts = [] 1062 self.enable_idxd = False 1063 1064 if "num_shared_buffers" in target_config: 1065 self.num_shared_buffers = target_config["num_shared_buffers"] 1066 if "max_queue_depth" in target_config: 1067 self.max_queue_depth = target_config["max_queue_depth"] 1068 if "null_block_dif_type" in target_config: 1069 self.null_block_dif_type = target_config["null_block_dif_type"] 1070 if "dif_insert_strip" in target_config: 1071 self.dif_insert_strip = target_config["dif_insert_strip"] 1072 if "bpf_scripts" in target_config: 1073 self.bpf_scripts = target_config["bpf_scripts"] 1074 if "idxd_settings" in target_config: 1075 self.enable_idxd = target_config["idxd_settings"] 1076 1077 self.log_print("====IDXD settings:====") 1078 self.log_print("IDXD enabled: %s" % (self.enable_idxd)) 1079 1080 @staticmethod 1081 def get_num_cores(core_mask): 1082 if "0x" in core_mask: 1083 return bin(int(core_mask, 16)).count("1") 1084 else: 1085 num_cores = 0 1086 core_mask = core_mask.replace("[", "") 1087 core_mask = core_mask.replace("]", "") 1088 for i in core_mask.split(","): 1089 if "-" in i: 1090 x, y = i.split("-") 1091 num_cores += len(range(int(x), int(y))) + 1 1092 else: 1093 num_cores += 1 1094 return num_cores 1095 1096 def spdk_tgt_configure(self): 1097 self.log_print("Configuring SPDK NVMeOF target via RPC") 1098 1099 if self.enable_adq: 1100 self.adq_configure_tc() 1101 1102 # Create transport layer 1103 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1104 num_shared_buffers=self.num_shared_buffers, 1105 max_queue_depth=self.max_queue_depth, 1106 dif_insert_or_strip=self.dif_insert_strip, 1107 sock_priority=self.adq_priority) 1108 self.log_print("SPDK NVMeOF transport layer:") 1109 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1110 1111 if self.null_block: 1112 self.spdk_tgt_add_nullblock(self.null_block) 1113 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1114 else: 1115 self.spdk_tgt_add_nvme_conf() 1116 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1117 1118 self.log_print("Done configuring SPDK NVMeOF Target") 1119 1120 def spdk_tgt_add_nullblock(self, null_block_count): 1121 md_size = 0 1122 block_size = 4096 1123 if self.null_block_dif_type != 0: 1124 md_size = 128 1125 1126 self.log_print("Adding null block bdevices to config via RPC") 1127 for i in range(null_block_count): 1128 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1129 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1130 dif_type=self.null_block_dif_type, md_size=md_size) 1131 self.log_print("SPDK Bdevs configuration:") 1132 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1133 1134 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1135 self.log_print("Adding NVMe bdevs to config via RPC") 1136 1137 bdfs = get_nvme_devices_bdf() 1138 bdfs = [b.replace(":", ".") for b in bdfs] 1139 1140 if req_num_disks: 1141 if req_num_disks > len(bdfs): 1142 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1143 sys.exit(1) 1144 else: 1145 bdfs = bdfs[0:req_num_disks] 1146 1147 for i, bdf in enumerate(bdfs): 1148 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1149 1150 self.log_print("SPDK Bdevs configuration:") 1151 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1152 1153 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1154 self.log_print("Adding subsystems to config") 1155 port = "4420" 1156 if not req_num_disks: 1157 req_num_disks = get_nvme_devices_count() 1158 1159 # Distribute bdevs between provided NICs 1160 num_disks = range(0, req_num_disks) 1161 if len(num_disks) == 1: 1162 disks_per_ip = 1 1163 else: 1164 disks_per_ip = int(len(num_disks) / len(ips)) 1165 disk_chunks = [[*num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i]] for i in range(0, len(ips))] 1166 1167 # Add remaining drives 1168 for i, disk in enumerate(num_disks[disks_per_ip * len(ips):]): 1169 disk_chunks[i].append(disk) 1170 1171 # Create subsystems, add bdevs to namespaces, add listeners 1172 for ip, chunk in zip(ips, disk_chunks): 1173 for c in chunk: 1174 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1175 serial = "SPDK00%s" % c 1176 bdev_name = "Nvme%sn1" % c 1177 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1178 allow_any_host=True, max_namespaces=8) 1179 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1180 1181 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1182 nqn=nqn, 1183 trtype=self.transport, 1184 traddr=ip, 1185 trsvcid=port, 1186 adrfam="ipv4") 1187 1188 self.subsystem_info_list.append([port, nqn, ip]) 1189 self.log_print("SPDK NVMeOF subsystem configuration:") 1190 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1191 1192 def bpf_start(self): 1193 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1194 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1195 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1196 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1197 1198 with open(self.pid, "r") as fh: 1199 nvmf_pid = str(fh.readline()) 1200 1201 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1202 self.log_print(cmd) 1203 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1204 1205 def tgt_start(self): 1206 if self.null_block: 1207 self.subsys_no = 1 1208 else: 1209 self.subsys_no = get_nvme_devices_count() 1210 self.log_print("Starting SPDK NVMeOF Target process") 1211 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1212 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1213 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1214 1215 with open(self.pid, "w") as fh: 1216 fh.write(str(proc.pid)) 1217 self.nvmf_proc = proc 1218 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1219 self.log_print("Waiting for spdk to initialize...") 1220 while True: 1221 if os.path.exists("/var/tmp/spdk.sock"): 1222 break 1223 time.sleep(1) 1224 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1225 1226 if self.enable_zcopy: 1227 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1228 enable_zerocopy_send_server=True) 1229 self.log_print("Target socket options:") 1230 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1231 1232 if self.enable_adq: 1233 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1234 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1235 nvme_adminq_poll_period_us=100000, retry_count=4) 1236 1237 if self.enable_idxd: 1238 rpc.idxd.idxd_scan_accel_engine(self.client, config_kernel_mode=None) 1239 self.log_print("Target IDXD accel engine enabled") 1240 1241 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1242 rpc.framework_start_init(self.client) 1243 1244 if self.bpf_scripts: 1245 self.bpf_start() 1246 1247 self.spdk_tgt_configure() 1248 1249 def stop(self): 1250 if self.bpf_proc: 1251 self.log_print("Stopping BPF Trace script") 1252 self.bpf_proc.terminate() 1253 self.bpf_proc.wait() 1254 1255 if hasattr(self, "nvmf_proc"): 1256 try: 1257 self.nvmf_proc.terminate() 1258 self.nvmf_proc.wait() 1259 except Exception as e: 1260 self.log_print(e) 1261 self.nvmf_proc.kill() 1262 self.nvmf_proc.communicate() 1263 1264 1265class KernelInitiator(Initiator): 1266 def __init__(self, name, general_config, initiator_config): 1267 super().__init__(name, general_config, initiator_config) 1268 1269 # Defaults 1270 self.extra_params = "" 1271 self.ioengine = "libaio" 1272 1273 if "extra_params" in initiator_config: 1274 self.extra_params = initiator_config["extra_params"] 1275 1276 if "kernel_engine" in initiator_config: 1277 self.ioengine = initiator_config["kernel_engine"] 1278 if "io_uring" in self.ioengine: 1279 self.extra_params = "--nr-poll-queues=8" 1280 1281 def get_connected_nvme_list(self): 1282 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1283 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1284 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1285 return nvme_list 1286 1287 def kernel_init_connect(self): 1288 self.log_print("Below connection attempts may result in error messages, this is expected!") 1289 for subsystem in self.subsystem_info_list: 1290 self.log_print("Trying to connect %s %s %s" % subsystem) 1291 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1292 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1293 time.sleep(2) 1294 1295 if "io_uring" in self.ioengine: 1296 self.log_print("Setting block layer settings for io_uring.") 1297 1298 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1299 # apparently it's not possible for connected subsystems. 1300 # Results in "error: Invalid argument" 1301 block_sysfs_settings = { 1302 "iostats": "0", 1303 "rq_affinity": "0", 1304 "nomerges": "2" 1305 } 1306 1307 for disk in self.get_connected_nvme_list(): 1308 sysfs = os.path.join("/sys/block", disk, "queue") 1309 for k, v in block_sysfs_settings.items(): 1310 sysfs_opt_path = os.path.join(sysfs, k) 1311 try: 1312 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1313 except subprocess.CalledProcessError as e: 1314 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1315 finally: 1316 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1317 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1318 1319 def kernel_init_disconnect(self): 1320 for subsystem in self.subsystem_info_list: 1321 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1322 time.sleep(1) 1323 1324 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1325 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1326 1327 filename_section = "" 1328 nvme_per_split = int(len(nvme_list) / len(threads)) 1329 remainder = len(nvme_list) % len(threads) 1330 iterator = iter(nvme_list) 1331 result = [] 1332 for i in range(len(threads)): 1333 result.append([]) 1334 for _ in range(nvme_per_split): 1335 result[i].append(next(iterator)) 1336 if remainder: 1337 result[i].append(next(iterator)) 1338 remainder -= 1 1339 for i, r in enumerate(result): 1340 header = "[filename%s]" % i 1341 disks = "\n".join(["filename=%s" % x for x in r]) 1342 job_section_qd = round((io_depth * len(r)) / num_jobs) 1343 if job_section_qd == 0: 1344 job_section_qd = 1 1345 iodepth = "iodepth=%s" % job_section_qd 1346 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1347 1348 return filename_section 1349 1350 1351class SPDKInitiator(Initiator): 1352 def __init__(self, name, general_config, initiator_config): 1353 super().__init__(name, general_config, initiator_config) 1354 1355 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1356 self.install_spdk() 1357 1358 # Required fields 1359 self.num_cores = initiator_config["num_cores"] 1360 1361 # Optional fields 1362 self.enable_data_digest = False 1363 if "enable_data_digest" in initiator_config: 1364 self.enable_data_digest = initiator_config["enable_data_digest"] 1365 1366 def install_spdk(self): 1367 self.log_print("Using fio binary %s" % self.fio_bin) 1368 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1369 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1370 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1371 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1372 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1373 1374 self.log_print("SPDK built") 1375 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1376 1377 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1378 bdev_cfg_section = { 1379 "subsystems": [ 1380 { 1381 "subsystem": "bdev", 1382 "config": [] 1383 } 1384 ] 1385 } 1386 1387 for i, subsys in enumerate(remote_subsystem_list): 1388 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1389 nvme_ctrl = { 1390 "method": "bdev_nvme_attach_controller", 1391 "params": { 1392 "name": "Nvme{}".format(i), 1393 "trtype": self.transport, 1394 "traddr": sub_addr, 1395 "trsvcid": sub_port, 1396 "subnqn": sub_nqn, 1397 "adrfam": "IPv4" 1398 } 1399 } 1400 1401 if self.enable_adq: 1402 nvme_ctrl["params"].update({"priority": "1"}) 1403 1404 if self.enable_data_digest: 1405 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1406 1407 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1408 1409 return json.dumps(bdev_cfg_section, indent=2) 1410 1411 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1412 filename_section = "" 1413 if len(threads) >= len(subsystems): 1414 threads = range(0, len(subsystems)) 1415 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1416 nvme_per_split = int(len(subsystems) / len(threads)) 1417 remainder = len(subsystems) % len(threads) 1418 iterator = iter(filenames) 1419 result = [] 1420 for i in range(len(threads)): 1421 result.append([]) 1422 for _ in range(nvme_per_split): 1423 result[i].append(next(iterator)) 1424 if remainder: 1425 result[i].append(next(iterator)) 1426 remainder -= 1 1427 for i, r in enumerate(result): 1428 header = "[filename%s]" % i 1429 disks = "\n".join(["filename=%s" % x for x in r]) 1430 job_section_qd = round((io_depth * len(r)) / num_jobs) 1431 if job_section_qd == 0: 1432 job_section_qd = 1 1433 iodepth = "iodepth=%s" % job_section_qd 1434 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1435 1436 return filename_section 1437 1438 1439if __name__ == "__main__": 1440 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1441 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1442 1443 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1444 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1445 help='Configuration file.') 1446 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1447 help='Results directory.') 1448 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1449 help='CSV results filename.') 1450 1451 args = parser.parse_args() 1452 1453 print("Using config file: %s" % args.config) 1454 with open(args.config, "r") as config: 1455 data = json.load(config) 1456 1457 initiators = [] 1458 fio_cases = [] 1459 1460 general_config = data["general"] 1461 target_config = data["target"] 1462 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1463 1464 for k, v in data.items(): 1465 if "target" in k: 1466 v.update({"results_dir": args.results}) 1467 if data[k]["mode"] == "spdk": 1468 target_obj = SPDKTarget(k, data["general"], v) 1469 elif data[k]["mode"] == "kernel": 1470 target_obj = KernelTarget(k, data["general"], v) 1471 elif "initiator" in k: 1472 if data[k]["mode"] == "spdk": 1473 init_obj = SPDKInitiator(k, data["general"], v) 1474 elif data[k]["mode"] == "kernel": 1475 init_obj = KernelInitiator(k, data["general"], v) 1476 initiators.append(init_obj) 1477 elif "fio" in k: 1478 fio_workloads = itertools.product(data[k]["bs"], 1479 data[k]["qd"], 1480 data[k]["rw"]) 1481 1482 fio_run_time = data[k]["run_time"] 1483 fio_ramp_time = data[k]["ramp_time"] 1484 fio_rw_mix_read = data[k]["rwmixread"] 1485 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1486 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1487 1488 fio_rate_iops = 0 1489 if "rate_iops" in data[k]: 1490 fio_rate_iops = data[k]["rate_iops"] 1491 else: 1492 continue 1493 1494 try: 1495 os.mkdir(args.results) 1496 except FileExistsError: 1497 pass 1498 1499 # TODO: This try block is definietly too large. Need to break this up into separate 1500 # logical blocks to reduce size. 1501 try: 1502 target_obj.tgt_start() 1503 1504 for i in initiators: 1505 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1506 if i.enable_adq: 1507 i.adq_configure_tc() 1508 1509 # Poor mans threading 1510 # Run FIO tests 1511 for block_size, io_depth, rw in fio_workloads: 1512 threads = [] 1513 configs = [] 1514 for i in initiators: 1515 if i.mode == "kernel": 1516 i.kernel_init_connect() 1517 1518 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1519 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1520 configs.append(cfg) 1521 1522 for i, cfg in zip(initiators, configs): 1523 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1524 threads.append(t) 1525 if target_obj.enable_sar: 1526 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"]) 1527 sar_file_name = ".".join([sar_file_name, "txt"]) 1528 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name)) 1529 threads.append(t) 1530 1531 if target_obj.enable_pcm: 1532 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1533 1534 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1535 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1536 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1537 1538 threads.append(pcm_cpu_t) 1539 threads.append(pcm_mem_t) 1540 threads.append(pcm_pow_t) 1541 1542 if target_obj.enable_bandwidth: 1543 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1544 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1545 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1546 threads.append(t) 1547 1548 if target_obj.enable_dpdk_memory: 1549 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1550 threads.append(t) 1551 1552 if target_obj.enable_adq: 1553 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1554 threads.append(ethtool_thread) 1555 1556 for t in threads: 1557 t.start() 1558 for t in threads: 1559 t.join() 1560 1561 for i in initiators: 1562 if i.mode == "kernel": 1563 i.kernel_init_disconnect() 1564 i.copy_result_files(args.results) 1565 1566 target_obj.restore_governor() 1567 target_obj.restore_tuned() 1568 target_obj.restore_services() 1569 target_obj.restore_sysctl() 1570 for i in initiators: 1571 i.restore_governor() 1572 i.restore_tuned() 1573 i.restore_services() 1574 i.restore_sysctl() 1575 target_obj.parse_results(args.results, args.csv_filename) 1576 finally: 1577 for i in initiators: 1578 try: 1579 i.stop() 1580 except Exception as err: 1581 pass 1582 target_obj.stop() 1583