1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 # pylint: disable=R0201 97 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 98 return "" 99 100 def configure_system(self): 101 self.configure_services() 102 self.configure_sysctl() 103 self.configure_tuned() 104 self.configure_cpu_governor() 105 self.configure_irq_affinity() 106 107 def configure_adq(self): 108 if self.mode == "kernel": 109 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 110 return 111 self.adq_load_modules() 112 self.adq_configure_nic() 113 114 def adq_load_modules(self): 115 self.log_print("Modprobing ADQ-related Linux modules...") 116 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 117 for module in adq_module_deps: 118 try: 119 self.exec_cmd(["sudo", "modprobe", module]) 120 self.log_print("%s loaded!" % module) 121 except CalledProcessError as e: 122 self.log_print("ERROR: failed to load module %s" % module) 123 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 124 125 def adq_configure_tc(self): 126 self.log_print("Configuring ADQ Traffic classes and filters...") 127 128 if self.mode == "kernel": 129 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 130 return 131 132 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 133 num_queues_tc1 = self.num_cores 134 port_param = "dst_port" if isinstance(self, Target) else "src_port" 135 port = "4420" 136 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 137 138 for nic_ip in self.nic_ips: 139 nic_name = self.get_nic_name_by_ip(nic_ip) 140 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 141 "root", "mqprio", "num_tc", "2", "map", "0", "1", 142 "queues", "%s@0" % num_queues_tc0, 143 "%s@%s" % (num_queues_tc1, num_queues_tc0), 144 "hw", "1", "mode", "channel"] 145 self.log_print(" ".join(tc_qdisc_map_cmd)) 146 self.exec_cmd(tc_qdisc_map_cmd) 147 148 time.sleep(5) 149 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 150 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 151 self.exec_cmd(tc_qdisc_ingress_cmd) 152 153 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 154 "protocol", "ip", "ingress", "prio", "1", "flower", 155 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 156 "skip_sw", "hw_tc", "1"] 157 self.log_print(" ".join(tc_filter_cmd)) 158 self.exec_cmd(tc_filter_cmd) 159 160 # show tc configuration 161 self.log_print("Show tc configuration for %s NIC..." % nic_name) 162 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 163 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 164 self.log_print("%s" % tc_disk_out) 165 self.log_print("%s" % tc_filter_out) 166 167 # Ethtool coalesce settings must be applied after configuring traffic classes 168 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 169 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 170 171 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 172 xps_cmd = ["sudo", xps_script_path, nic_name] 173 self.log_print(xps_cmd) 174 self.exec_cmd(xps_cmd) 175 176 def reload_driver(self, driver): 177 try: 178 self.exec_cmd(["sudo", "rmmod", driver]) 179 self.exec_cmd(["sudo", "modprobe", driver]) 180 except CalledProcessError as e: 181 self.log_print("ERROR: failed to reload %s module!" % driver) 182 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 183 184 def adq_configure_nic(self): 185 self.log_print("Configuring NIC port settings for ADQ testing...") 186 187 # Reload the driver first, to make sure any previous settings are re-set. 188 self.reload_driver("ice") 189 190 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 191 for nic in nic_names: 192 self.log_print(nic) 193 try: 194 self.exec_cmd(["sudo", "ethtool", "-K", nic, 195 "hw-tc-offload", "on"]) # Enable hardware TC offload 196 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 197 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 198 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 199 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 200 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 201 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 202 "channel-pkt-inspect-optimize", "on"]) 203 except CalledProcessError as e: 204 self.log_print("ERROR: failed to configure NIC port using ethtool!") 205 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 206 self.log_print("Please update your NIC driver and firmware versions and try again.") 207 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 208 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 209 210 def configure_services(self): 211 self.log_print("Configuring active services...") 212 svc_config = configparser.ConfigParser(strict=False) 213 214 # Below list is valid only for RHEL / Fedora systems and might not 215 # contain valid names for other distributions. 216 svc_target_state = { 217 "firewalld": "inactive", 218 "irqbalance": "inactive", 219 "lldpad.service": "inactive", 220 "lldpad.socket": "inactive" 221 } 222 223 for service in svc_target_state: 224 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 225 out = "\n".join(["[%s]" % service, out]) 226 svc_config.read_string(out) 227 228 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 229 continue 230 231 service_state = svc_config[service]["ActiveState"] 232 self.log_print("Current state of %s service is %s" % (service, service_state)) 233 self.svc_restore_dict.update({service: service_state}) 234 if service_state != "inactive": 235 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 236 self.exec_cmd(["sudo", "systemctl", "stop", service]) 237 238 def configure_sysctl(self): 239 self.log_print("Tuning sysctl settings...") 240 241 busy_read = 0 242 if self.enable_adq and self.mode == "spdk": 243 busy_read = 1 244 245 sysctl_opts = { 246 "net.core.busy_poll": 0, 247 "net.core.busy_read": busy_read, 248 "net.core.somaxconn": 4096, 249 "net.core.netdev_max_backlog": 8192, 250 "net.ipv4.tcp_max_syn_backlog": 16384, 251 "net.core.rmem_max": 268435456, 252 "net.core.wmem_max": 268435456, 253 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 254 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 255 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 256 "net.ipv4.route.flush": 1, 257 "vm.overcommit_memory": 1, 258 } 259 260 for opt, value in sysctl_opts.items(): 261 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 262 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 263 264 def configure_tuned(self): 265 if not self.tuned_profile: 266 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 267 return 268 269 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 270 service = "tuned" 271 tuned_config = configparser.ConfigParser(strict=False) 272 273 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 274 out = "\n".join(["[%s]" % service, out]) 275 tuned_config.read_string(out) 276 tuned_state = tuned_config[service]["ActiveState"] 277 self.svc_restore_dict.update({service: tuned_state}) 278 279 if tuned_state != "inactive": 280 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 281 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 282 283 self.tuned_restore_dict = { 284 "profile": profile, 285 "mode": profile_mode 286 } 287 288 self.exec_cmd(["sudo", "systemctl", "start", service]) 289 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 290 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 291 292 def configure_cpu_governor(self): 293 self.log_print("Setting CPU governor to performance...") 294 295 # This assumes that there is the same CPU scaling governor on each CPU 296 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 297 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 298 299 def configure_irq_affinity(self): 300 self.log_print("Setting NIC irq affinity for NICs...") 301 302 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 303 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 304 for nic in nic_names: 305 irq_cmd = ["sudo", irq_script_path, nic] 306 self.log_print(irq_cmd) 307 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 308 309 def restore_services(self): 310 self.log_print("Restoring services...") 311 for service, state in self.svc_restore_dict.items(): 312 cmd = "stop" if state == "inactive" else "start" 313 self.exec_cmd(["sudo", "systemctl", cmd, service]) 314 315 def restore_sysctl(self): 316 self.log_print("Restoring sysctl settings...") 317 for opt, value in self.sysctl_restore_dict.items(): 318 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 319 320 def restore_tuned(self): 321 self.log_print("Restoring tuned-adm settings...") 322 323 if not self.tuned_restore_dict: 324 return 325 326 if self.tuned_restore_dict["mode"] == "auto": 327 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 328 self.log_print("Reverted tuned-adm to auto_profile.") 329 else: 330 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 331 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 332 333 def restore_governor(self): 334 self.log_print("Restoring CPU governor setting...") 335 if self.governor_restore: 336 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 337 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 338 339 340class Target(Server): 341 def __init__(self, name, general_config, target_config): 342 super().__init__(name, general_config, target_config) 343 344 # Defaults 345 self.enable_sar = False 346 self.sar_delay = 0 347 self.sar_interval = 0 348 self.sar_count = 0 349 self.enable_pcm = False 350 self.pcm_dir = "" 351 self.pcm_delay = 0 352 self.pcm_interval = 0 353 self.pcm_count = 0 354 self.enable_bandwidth = 0 355 self.bandwidth_count = 0 356 self.enable_dpdk_memory = False 357 self.dpdk_wait_time = 0 358 self.enable_zcopy = False 359 self.scheduler_name = "static" 360 self.null_block = 0 361 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 362 self.subsystem_info_list = [] 363 364 if "null_block_devices" in target_config: 365 self.null_block = target_config["null_block_devices"] 366 if "sar_settings" in target_config: 367 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 368 if "pcm_settings" in target_config: 369 self.enable_pcm = True 370 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 371 if "enable_bandwidth" in target_config: 372 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 373 if "enable_dpdk_memory" in target_config: 374 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 375 if "scheduler_settings" in target_config: 376 self.scheduler_name = target_config["scheduler_settings"] 377 if "zcopy_settings" in target_config: 378 self.enable_zcopy = target_config["zcopy_settings"] 379 if "results_dir" in target_config: 380 self.results_dir = target_config["results_dir"] 381 382 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 383 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 384 self.set_local_nic_info(self.set_local_nic_info_helper()) 385 386 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 387 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 388 389 self.configure_system() 390 if self.enable_adq: 391 self.configure_adq() 392 self.sys_config() 393 394 def set_local_nic_info_helper(self): 395 return json.loads(self.exec_cmd(["lshw", "-json"])) 396 397 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 398 stderr_opt = None 399 if stderr_redirect: 400 stderr_opt = subprocess.STDOUT 401 if change_dir: 402 old_cwd = os.getcwd() 403 os.chdir(change_dir) 404 self.log_print("Changing directory to %s" % change_dir) 405 406 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 407 408 if change_dir: 409 os.chdir(old_cwd) 410 self.log_print("Changing directory to %s" % old_cwd) 411 return out 412 413 def zip_spdk_sources(self, spdk_dir, dest_file): 414 self.log_print("Zipping SPDK source directory") 415 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 416 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 417 for file in files: 418 fh.write(os.path.relpath(os.path.join(root, file))) 419 fh.close() 420 self.log_print("Done zipping") 421 422 @staticmethod 423 def read_json_stats(file): 424 with open(file, "r") as json_data: 425 data = json.load(json_data) 426 job_pos = 0 # job_post = 0 because using aggregated results 427 428 # Check if latency is in nano or microseconds to choose correct dict key 429 def get_lat_unit(key_prefix, dict_section): 430 # key prefix - lat, clat or slat. 431 # dict section - portion of json containing latency bucket in question 432 # Return dict key to access the bucket and unit as string 433 for k, _ in dict_section.items(): 434 if k.startswith(key_prefix): 435 return k, k.split("_")[1] 436 437 def get_clat_percentiles(clat_dict_leaf): 438 if "percentile" in clat_dict_leaf: 439 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 440 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 441 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 442 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 443 444 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 445 else: 446 # Latest fio versions do not provide "percentile" results if no 447 # measurements were done, so just return zeroes 448 return [0, 0, 0, 0] 449 450 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 451 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 452 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 453 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 454 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 455 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 456 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 457 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 458 data["jobs"][job_pos]["read"][clat_key]) 459 460 if "ns" in lat_unit: 461 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 462 if "ns" in clat_unit: 463 read_p99_lat = read_p99_lat / 1000 464 read_p99_9_lat = read_p99_9_lat / 1000 465 read_p99_99_lat = read_p99_99_lat / 1000 466 read_p99_999_lat = read_p99_999_lat / 1000 467 468 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 469 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 470 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 471 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 472 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 473 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 474 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 475 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 476 data["jobs"][job_pos]["write"][clat_key]) 477 478 if "ns" in lat_unit: 479 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 480 if "ns" in clat_unit: 481 write_p99_lat = write_p99_lat / 1000 482 write_p99_9_lat = write_p99_9_lat / 1000 483 write_p99_99_lat = write_p99_99_lat / 1000 484 write_p99_999_lat = write_p99_999_lat / 1000 485 486 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 487 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 488 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 489 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 490 491 def parse_results(self, results_dir, csv_file): 492 files = os.listdir(results_dir) 493 fio_files = filter(lambda x: ".fio" in x, files) 494 json_files = [x for x in files if ".json" in x] 495 496 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 497 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 498 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 499 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 500 501 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 502 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 503 504 header_line = ",".join(["Name", *headers]) 505 aggr_header_line = ",".join(["Name", *aggr_headers]) 506 507 # Create empty results file 508 with open(os.path.join(results_dir, csv_file), "w") as fh: 509 fh.write(aggr_header_line + "\n") 510 rows = set() 511 512 for fio_config in fio_files: 513 self.log_print("Getting FIO stats for %s" % fio_config) 514 job_name, _ = os.path.splitext(fio_config) 515 516 # Look in the filename for rwmixread value. Function arguments do 517 # not have that information. 518 # TODO: Improve this function by directly using workload params instead 519 # of regexing through filenames. 520 if "read" in job_name: 521 rw_mixread = 1 522 elif "write" in job_name: 523 rw_mixread = 0 524 else: 525 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 526 527 # If "_CPU" exists in name - ignore it 528 # Initiators for the same job could have different num_cores parameter 529 job_name = re.sub(r"_\d+CPU", "", job_name) 530 job_result_files = [x for x in json_files if x.startswith(job_name)] 531 self.log_print("Matching result files for current fio config:") 532 for j in job_result_files: 533 self.log_print("\t %s" % j) 534 535 # There may have been more than 1 initiator used in test, need to check that 536 # Result files are created so that string after last "_" separator is server name 537 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 538 inits_avg_results = [] 539 for i in inits_names: 540 self.log_print("\tGetting stats for initiator %s" % i) 541 # There may have been more than 1 test run for this job, calculate average results for initiator 542 i_results = [x for x in job_result_files if i in x] 543 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 544 545 separate_stats = [] 546 for r in i_results: 547 try: 548 stats = self.read_json_stats(os.path.join(results_dir, r)) 549 separate_stats.append(stats) 550 self.log_print(stats) 551 except JSONDecodeError: 552 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 553 554 init_results = [sum(x) for x in zip(*separate_stats)] 555 init_results = [x / len(separate_stats) for x in init_results] 556 inits_avg_results.append(init_results) 557 558 self.log_print("\tAverage results for initiator %s" % i) 559 self.log_print(init_results) 560 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 561 fh.write(header_line + "\n") 562 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 563 564 # Sum results of all initiators running this FIO job. 565 # Latency results are an average of latencies from accros all initiators. 566 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 567 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 568 for key in inits_avg_results: 569 if "lat" in key: 570 inits_avg_results[key] /= len(inits_names) 571 572 # Aggregate separate read/write values into common labels 573 # Take rw_mixread into consideration for mixed read/write workloads. 574 aggregate_results = OrderedDict() 575 for h in aggr_headers: 576 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 577 if "lat" in h: 578 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 579 else: 580 _ = read_stat + write_stat 581 aggregate_results[h] = "{0:.3f}".format(_) 582 583 rows.add(",".join([job_name, *aggregate_results.values()])) 584 585 # Save results to file 586 for row in rows: 587 with open(os.path.join(results_dir, csv_file), "a") as fh: 588 fh.write(row + "\n") 589 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 590 591 def measure_sar(self, results_dir, sar_file_prefix): 592 cpu_number = os.cpu_count() 593 sar_idle_sum = 0 594 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 595 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 596 597 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 598 time.sleep(self.sar_delay) 599 self.log_print("Starting SAR measurements") 600 601 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 602 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 603 for line in out.split("\n"): 604 if "Average" in line: 605 if "CPU" in line: 606 self.log_print("Summary CPU utilization from SAR:") 607 self.log_print(line) 608 elif "all" in line: 609 self.log_print(line) 610 else: 611 sar_idle_sum += float(line.split()[7]) 612 fh.write(out) 613 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 614 615 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 616 f.write("%0.2f" % sar_cpu_usage) 617 618 def ethtool_after_fio_ramp(self, fio_ramp_time): 619 time.sleep(fio_ramp_time//2) 620 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 621 for nic in nic_names: 622 self.log_print(nic) 623 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 624 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 625 626 def measure_pcm_memory(self, results_dir, pcm_file_name): 627 time.sleep(self.pcm_delay) 628 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 629 pcm_memory = subprocess.Popen(cmd) 630 time.sleep(self.pcm_count) 631 pcm_memory.terminate() 632 633 def measure_pcm(self, results_dir, pcm_file_name): 634 time.sleep(self.pcm_delay) 635 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 636 subprocess.run(cmd) 637 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 638 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 639 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 640 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 641 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 642 643 def measure_pcm_power(self, results_dir, pcm_power_file_name): 644 time.sleep(self.pcm_delay) 645 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 646 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 647 fh.write(out) 648 649 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 650 self.log_print("INFO: starting network bandwidth measure") 651 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 652 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 653 654 def measure_dpdk_memory(self, results_dir): 655 self.log_print("INFO: waiting to generate DPDK memory usage") 656 time.sleep(self.dpdk_wait_time) 657 self.log_print("INFO: generating DPDK memory usage") 658 rpc.env.env_dpdk_get_mem_stats 659 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 660 661 def sys_config(self): 662 self.log_print("====Kernel release:====") 663 self.log_print(os.uname().release) 664 self.log_print("====Kernel command line:====") 665 with open('/proc/cmdline') as f: 666 cmdline = f.readlines() 667 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 668 self.log_print("====sysctl conf:====") 669 with open('/etc/sysctl.conf') as f: 670 sysctl = f.readlines() 671 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 672 self.log_print("====Cpu power info:====") 673 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 674 self.log_print("====zcopy settings:====") 675 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 676 self.log_print("====Scheduler settings:====") 677 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 678 679 680class Initiator(Server): 681 def __init__(self, name, general_config, initiator_config): 682 super().__init__(name, general_config, initiator_config) 683 684 # Required fields 685 self.ip = initiator_config["ip"] 686 self.target_nic_ips = initiator_config["target_nic_ips"] 687 688 # Defaults 689 self.cpus_allowed = None 690 self.cpus_allowed_policy = "shared" 691 self.spdk_dir = "/tmp/spdk" 692 self.fio_bin = "/usr/src/fio/fio" 693 self.nvmecli_bin = "nvme" 694 self.cpu_frequency = None 695 self.subsystem_info_list = [] 696 697 if "spdk_dir" in initiator_config: 698 self.spdk_dir = initiator_config["spdk_dir"] 699 if "fio_bin" in initiator_config: 700 self.fio_bin = initiator_config["fio_bin"] 701 if "nvmecli_bin" in initiator_config: 702 self.nvmecli_bin = initiator_config["nvmecli_bin"] 703 if "cpus_allowed" in initiator_config: 704 self.cpus_allowed = initiator_config["cpus_allowed"] 705 if "cpus_allowed_policy" in initiator_config: 706 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 707 if "cpu_frequency" in initiator_config: 708 self.cpu_frequency = initiator_config["cpu_frequency"] 709 if os.getenv('SPDK_WORKSPACE'): 710 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 711 712 self.ssh_connection = paramiko.SSHClient() 713 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 714 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 715 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 716 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 717 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 718 719 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 720 self.copy_spdk("/tmp/spdk.zip") 721 self.set_local_nic_info(self.set_local_nic_info_helper()) 722 self.set_cpu_frequency() 723 self.configure_system() 724 if self.enable_adq: 725 self.configure_adq() 726 self.sys_config() 727 728 def set_local_nic_info_helper(self): 729 return json.loads(self.exec_cmd(["lshw", "-json"])) 730 731 def stop(self): 732 self.ssh_connection.close() 733 734 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 735 if change_dir: 736 cmd = ["cd", change_dir, ";", *cmd] 737 738 # In case one of the command elements contains whitespace and is not 739 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 740 # when sending to remote system. 741 for i, c in enumerate(cmd): 742 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 743 cmd[i] = '"%s"' % c 744 cmd = " ".join(cmd) 745 746 # Redirect stderr to stdout thanks using get_pty option if needed 747 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 748 out = stdout.read().decode(encoding="utf-8") 749 750 # Check the return code 751 rc = stdout.channel.recv_exit_status() 752 if rc: 753 raise CalledProcessError(int(rc), cmd, out) 754 755 return out 756 757 def put_file(self, local, remote_dest): 758 ftp = self.ssh_connection.open_sftp() 759 ftp.put(local, remote_dest) 760 ftp.close() 761 762 def get_file(self, remote, local_dest): 763 ftp = self.ssh_connection.open_sftp() 764 ftp.get(remote, local_dest) 765 ftp.close() 766 767 def copy_spdk(self, local_spdk_zip): 768 self.log_print("Copying SPDK sources to initiator %s" % self.name) 769 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 770 self.log_print("Copied sources zip from target") 771 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 772 self.log_print("Sources unpacked") 773 774 def copy_result_files(self, dest_dir): 775 self.log_print("Copying results") 776 777 if not os.path.exists(dest_dir): 778 os.mkdir(dest_dir) 779 780 # Get list of result files from initiator and copy them back to target 781 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 782 783 for file in file_list: 784 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 785 os.path.join(dest_dir, file)) 786 self.log_print("Done copying results") 787 788 def discover_subsystems(self, address_list, subsys_no): 789 num_nvmes = range(0, subsys_no) 790 nvme_discover_output = "" 791 for ip, subsys_no in itertools.product(address_list, num_nvmes): 792 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 793 nvme_discover_cmd = ["sudo", 794 "%s" % self.nvmecli_bin, 795 "discover", "-t", "%s" % self.transport, 796 "-s", "%s" % (4420 + subsys_no), 797 "-a", "%s" % ip] 798 799 try: 800 stdout = self.exec_cmd(nvme_discover_cmd) 801 if stdout: 802 nvme_discover_output = nvme_discover_output + stdout 803 except CalledProcessError: 804 # Do nothing. In case of discovering remote subsystems of kernel target 805 # we expect "nvme discover" to fail a bunch of times because we basically 806 # scan ports. 807 pass 808 809 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 810 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 811 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 812 nvme_discover_output) # from nvme discovery output 813 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 814 subsystems = list(set(subsystems)) 815 subsystems.sort(key=lambda x: x[1]) 816 self.log_print("Found matching subsystems on target side:") 817 for s in subsystems: 818 self.log_print(s) 819 self.subsystem_info_list = subsystems 820 821 def gen_fio_filename_conf(self, *args, **kwargs): 822 # Logic implemented in SPDKInitiator and KernelInitiator classes 823 pass 824 825 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 826 fio_conf_template = """ 827[global] 828ioengine={ioengine} 829{spdk_conf} 830thread=1 831group_reporting=1 832direct=1 833percentile_list=50:90:99:99.5:99.9:99.99:99.999 834 835norandommap=1 836rw={rw} 837rwmixread={rwmixread} 838bs={block_size} 839time_based=1 840ramp_time={ramp_time} 841runtime={run_time} 842rate_iops={rate_iops} 843""" 844 if "spdk" in self.mode: 845 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 846 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 847 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 848 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 849 else: 850 ioengine = self.ioengine 851 spdk_conf = "" 852 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 853 "|", "awk", "'{print $1}'"]) 854 subsystems = [x for x in out.split("\n") if "nvme" in x] 855 856 if self.cpus_allowed is not None: 857 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 858 cpus_num = 0 859 cpus = self.cpus_allowed.split(",") 860 for cpu in cpus: 861 if "-" in cpu: 862 a, b = cpu.split("-") 863 a = int(a) 864 b = int(b) 865 cpus_num += len(range(a, b)) 866 else: 867 cpus_num += 1 868 self.num_cores = cpus_num 869 threads = range(0, self.num_cores) 870 elif hasattr(self, 'num_cores'): 871 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 872 threads = range(0, int(self.num_cores)) 873 else: 874 self.num_cores = len(subsystems) 875 threads = range(0, len(subsystems)) 876 877 if "spdk" in self.mode: 878 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 879 else: 880 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 881 882 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 883 rw=rw, rwmixread=rwmixread, block_size=block_size, 884 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 885 886 # TODO: hipri disabled for now, as it causes fio errors: 887 # io_u error on file /dev/nvme2n1: Operation not supported 888 # See comment in KernelInitiator class, kernel_init_connect() function 889 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 890 fio_config = fio_config + """ 891fixedbufs=1 892registerfiles=1 893#hipri=1 894""" 895 if num_jobs: 896 fio_config = fio_config + "numjobs=%s \n" % num_jobs 897 if self.cpus_allowed is not None: 898 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 899 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 900 fio_config = fio_config + filename_section 901 902 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 903 if hasattr(self, "num_cores"): 904 fio_config_filename += "_%sCPU" % self.num_cores 905 fio_config_filename += ".fio" 906 907 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 908 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 909 self.log_print("Created FIO Config:") 910 self.log_print(fio_config) 911 912 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 913 914 def set_cpu_frequency(self): 915 if self.cpu_frequency is not None: 916 try: 917 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 918 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 919 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 920 except Exception: 921 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 922 sys.exit() 923 else: 924 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 925 926 def run_fio(self, fio_config_file, run_num=None): 927 job_name, _ = os.path.splitext(fio_config_file) 928 self.log_print("Starting FIO run for job: %s" % job_name) 929 self.log_print("Using FIO: %s" % self.fio_bin) 930 931 if run_num: 932 for i in range(1, run_num + 1): 933 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 934 try: 935 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 936 "--output=%s" % output_filename, "--eta=never"], True) 937 self.log_print(output) 938 except subprocess.CalledProcessError as e: 939 self.log_print("ERROR: Fio process failed!") 940 self.log_print(e.stdout) 941 else: 942 output_filename = job_name + "_" + self.name + ".json" 943 output = self.exec_cmd(["sudo", self.fio_bin, 944 fio_config_file, "--output-format=json", 945 "--output" % output_filename], True) 946 self.log_print(output) 947 self.log_print("FIO run finished. Results in: %s" % output_filename) 948 949 def sys_config(self): 950 self.log_print("====Kernel release:====") 951 self.log_print(self.exec_cmd(["uname", "-r"])) 952 self.log_print("====Kernel command line:====") 953 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 954 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 955 self.log_print("====sysctl conf:====") 956 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 957 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 958 self.log_print("====Cpu power info:====") 959 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 960 961 962class KernelTarget(Target): 963 def __init__(self, name, general_config, target_config): 964 super().__init__(name, general_config, target_config) 965 # Defaults 966 self.nvmet_bin = "nvmetcli" 967 968 if "nvmet_bin" in target_config: 969 self.nvmet_bin = target_config["nvmet_bin"] 970 971 def stop(self): 972 nvmet_command(self.nvmet_bin, "clear") 973 974 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list): 975 976 nvmet_cfg = { 977 "ports": [], 978 "hosts": [], 979 "subsystems": [], 980 } 981 982 # Split disks between NIC IP's 983 disks_per_ip = int(len(nvme_list) / len(address_list)) 984 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))] 985 986 # Add remaining drives 987 for i, disk in enumerate(nvme_list[disks_per_ip * len(address_list):]): 988 disk_chunks[i].append(disk) 989 990 subsys_no = 1 991 port_no = 0 992 for ip, chunk in zip(address_list, disk_chunks): 993 for disk in chunk: 994 nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no 995 nvmet_cfg["subsystems"].append({ 996 "allowed_hosts": [], 997 "attr": { 998 "allow_any_host": "1", 999 "serial": "SPDK00%s" % subsys_no, 1000 "version": "1.3" 1001 }, 1002 "namespaces": [ 1003 { 1004 "device": { 1005 "path": disk, 1006 "uuid": "%s" % uuid.uuid4() 1007 }, 1008 "enable": 1, 1009 "nsid": subsys_no 1010 } 1011 ], 1012 "nqn": nqn 1013 }) 1014 1015 nvmet_cfg["ports"].append({ 1016 "addr": { 1017 "adrfam": "ipv4", 1018 "traddr": ip, 1019 "trsvcid": "%s" % (4420 + port_no), 1020 "trtype": "%s" % self.transport 1021 }, 1022 "portid": subsys_no, 1023 "referrals": [], 1024 "subsystems": [nqn] 1025 }) 1026 subsys_no += 1 1027 port_no += 1 1028 self.subsystem_info_list.append([port_no, nqn, ip]) 1029 1030 with open("kernel.conf", "w") as fh: 1031 fh.write(json.dumps(nvmet_cfg, indent=2)) 1032 1033 def tgt_start(self): 1034 self.log_print("Configuring kernel NVMeOF Target") 1035 1036 if self.null_block: 1037 print("Configuring with null block device.") 1038 null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1039 self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips) 1040 self.subsys_no = len(null_blk_list) 1041 else: 1042 print("Configuring with NVMe drives.") 1043 nvme_list = get_nvme_devices() 1044 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips) 1045 self.subsys_no = len(nvme_list) 1046 1047 nvmet_command(self.nvmet_bin, "clear") 1048 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1049 1050 if self.enable_adq: 1051 self.adq_configure_tc() 1052 1053 self.log_print("Done configuring kernel NVMeOF Target") 1054 1055 1056class SPDKTarget(Target): 1057 def __init__(self, name, general_config, target_config): 1058 super().__init__(name, general_config, target_config) 1059 1060 # Required fields 1061 self.core_mask = target_config["core_mask"] 1062 self.num_cores = self.get_num_cores(self.core_mask) 1063 1064 # Defaults 1065 self.dif_insert_strip = False 1066 self.null_block_dif_type = 0 1067 self.num_shared_buffers = 4096 1068 self.max_queue_depth = 128 1069 self.bpf_proc = None 1070 self.bpf_scripts = [] 1071 self.enable_idxd = False 1072 1073 if "num_shared_buffers" in target_config: 1074 self.num_shared_buffers = target_config["num_shared_buffers"] 1075 if "max_queue_depth" in target_config: 1076 self.max_queue_depth = target_config["max_queue_depth"] 1077 if "null_block_dif_type" in target_config: 1078 self.null_block_dif_type = target_config["null_block_dif_type"] 1079 if "dif_insert_strip" in target_config: 1080 self.dif_insert_strip = target_config["dif_insert_strip"] 1081 if "bpf_scripts" in target_config: 1082 self.bpf_scripts = target_config["bpf_scripts"] 1083 if "idxd_settings" in target_config: 1084 self.enable_idxd = target_config["idxd_settings"] 1085 1086 self.log_print("====IDXD settings:====") 1087 self.log_print("IDXD enabled: %s" % (self.enable_idxd)) 1088 1089 @staticmethod 1090 def get_num_cores(core_mask): 1091 if "0x" in core_mask: 1092 return bin(int(core_mask, 16)).count("1") 1093 else: 1094 num_cores = 0 1095 core_mask = core_mask.replace("[", "") 1096 core_mask = core_mask.replace("]", "") 1097 for i in core_mask.split(","): 1098 if "-" in i: 1099 x, y = i.split("-") 1100 num_cores += len(range(int(x), int(y))) + 1 1101 else: 1102 num_cores += 1 1103 return num_cores 1104 1105 def spdk_tgt_configure(self): 1106 self.log_print("Configuring SPDK NVMeOF target via RPC") 1107 1108 if self.enable_adq: 1109 self.adq_configure_tc() 1110 1111 # Create transport layer 1112 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1113 num_shared_buffers=self.num_shared_buffers, 1114 max_queue_depth=self.max_queue_depth, 1115 dif_insert_or_strip=self.dif_insert_strip, 1116 sock_priority=self.adq_priority) 1117 self.log_print("SPDK NVMeOF transport layer:") 1118 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1119 1120 if self.null_block: 1121 self.spdk_tgt_add_nullblock(self.null_block) 1122 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1123 else: 1124 self.spdk_tgt_add_nvme_conf() 1125 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1126 1127 self.log_print("Done configuring SPDK NVMeOF Target") 1128 1129 def spdk_tgt_add_nullblock(self, null_block_count): 1130 md_size = 0 1131 block_size = 4096 1132 if self.null_block_dif_type != 0: 1133 md_size = 128 1134 1135 self.log_print("Adding null block bdevices to config via RPC") 1136 for i in range(null_block_count): 1137 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1138 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1139 dif_type=self.null_block_dif_type, md_size=md_size) 1140 self.log_print("SPDK Bdevs configuration:") 1141 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1142 1143 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1144 self.log_print("Adding NVMe bdevs to config via RPC") 1145 1146 bdfs = get_nvme_devices_bdf() 1147 bdfs = [b.replace(":", ".") for b in bdfs] 1148 1149 if req_num_disks: 1150 if req_num_disks > len(bdfs): 1151 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1152 sys.exit(1) 1153 else: 1154 bdfs = bdfs[0:req_num_disks] 1155 1156 for i, bdf in enumerate(bdfs): 1157 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1158 1159 self.log_print("SPDK Bdevs configuration:") 1160 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1161 1162 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1163 self.log_print("Adding subsystems to config") 1164 port = "4420" 1165 if not req_num_disks: 1166 req_num_disks = get_nvme_devices_count() 1167 1168 # Distribute bdevs between provided NICs 1169 num_disks = range(0, req_num_disks) 1170 if len(num_disks) == 1: 1171 disks_per_ip = 1 1172 else: 1173 disks_per_ip = int(len(num_disks) / len(ips)) 1174 disk_chunks = [[*num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i]] for i in range(0, len(ips))] 1175 1176 # Add remaining drives 1177 for i, disk in enumerate(num_disks[disks_per_ip * len(ips):]): 1178 disk_chunks[i].append(disk) 1179 1180 # Create subsystems, add bdevs to namespaces, add listeners 1181 for ip, chunk in zip(ips, disk_chunks): 1182 for c in chunk: 1183 nqn = "nqn.2018-09.io.spdk:cnode%s" % c 1184 serial = "SPDK00%s" % c 1185 bdev_name = "Nvme%sn1" % c 1186 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1187 allow_any_host=True, max_namespaces=8) 1188 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1189 1190 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1191 nqn=nqn, 1192 trtype=self.transport, 1193 traddr=ip, 1194 trsvcid=port, 1195 adrfam="ipv4") 1196 1197 self.subsystem_info_list.append([port, nqn, ip]) 1198 self.log_print("SPDK NVMeOF subsystem configuration:") 1199 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1200 1201 def bpf_start(self): 1202 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1203 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1204 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1205 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1206 1207 with open(self.pid, "r") as fh: 1208 nvmf_pid = str(fh.readline()) 1209 1210 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1211 self.log_print(cmd) 1212 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1213 1214 def tgt_start(self): 1215 if self.null_block: 1216 self.subsys_no = 1 1217 else: 1218 self.subsys_no = get_nvme_devices_count() 1219 self.log_print("Starting SPDK NVMeOF Target process") 1220 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1221 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1222 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1223 1224 with open(self.pid, "w") as fh: 1225 fh.write(str(proc.pid)) 1226 self.nvmf_proc = proc 1227 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1228 self.log_print("Waiting for spdk to initialize...") 1229 while True: 1230 if os.path.exists("/var/tmp/spdk.sock"): 1231 break 1232 time.sleep(1) 1233 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1234 1235 if self.enable_zcopy: 1236 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1237 enable_zerocopy_send_server=True) 1238 self.log_print("Target socket options:") 1239 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1240 1241 if self.enable_adq: 1242 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1243 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1244 nvme_adminq_poll_period_us=100000, retry_count=4) 1245 1246 if self.enable_idxd: 1247 rpc.idxd.idxd_scan_accel_engine(self.client, config_kernel_mode=None) 1248 self.log_print("Target IDXD accel engine enabled") 1249 1250 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1251 rpc.framework_start_init(self.client) 1252 1253 if self.bpf_scripts: 1254 self.bpf_start() 1255 1256 self.spdk_tgt_configure() 1257 1258 def stop(self): 1259 if self.bpf_proc: 1260 self.log_print("Stopping BPF Trace script") 1261 self.bpf_proc.terminate() 1262 self.bpf_proc.wait() 1263 1264 if hasattr(self, "nvmf_proc"): 1265 try: 1266 self.nvmf_proc.terminate() 1267 self.nvmf_proc.wait() 1268 except Exception as e: 1269 self.log_print(e) 1270 self.nvmf_proc.kill() 1271 self.nvmf_proc.communicate() 1272 1273 1274class KernelInitiator(Initiator): 1275 def __init__(self, name, general_config, initiator_config): 1276 super().__init__(name, general_config, initiator_config) 1277 1278 # Defaults 1279 self.extra_params = "" 1280 self.ioengine = "libaio" 1281 1282 if "extra_params" in initiator_config: 1283 self.extra_params = initiator_config["extra_params"] 1284 1285 if "kernel_engine" in initiator_config: 1286 self.ioengine = initiator_config["kernel_engine"] 1287 if "io_uring" in self.ioengine: 1288 self.extra_params = "--nr-poll-queues=8" 1289 1290 def get_connected_nvme_list(self): 1291 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1292 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1293 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1294 return nvme_list 1295 1296 def kernel_init_connect(self): 1297 self.log_print("Below connection attempts may result in error messages, this is expected!") 1298 for subsystem in self.subsystem_info_list: 1299 self.log_print("Trying to connect %s %s %s" % subsystem) 1300 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1301 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1302 time.sleep(2) 1303 1304 if "io_uring" in self.ioengine: 1305 self.log_print("Setting block layer settings for io_uring.") 1306 1307 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1308 # apparently it's not possible for connected subsystems. 1309 # Results in "error: Invalid argument" 1310 block_sysfs_settings = { 1311 "iostats": "0", 1312 "rq_affinity": "0", 1313 "nomerges": "2" 1314 } 1315 1316 for disk in self.get_connected_nvme_list(): 1317 sysfs = os.path.join("/sys/block", disk, "queue") 1318 for k, v in block_sysfs_settings.items(): 1319 sysfs_opt_path = os.path.join(sysfs, k) 1320 try: 1321 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1322 except subprocess.CalledProcessError as e: 1323 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1324 finally: 1325 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1326 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1327 1328 def kernel_init_disconnect(self): 1329 for subsystem in self.subsystem_info_list: 1330 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1331 time.sleep(1) 1332 1333 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1334 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1335 1336 filename_section = "" 1337 nvme_per_split = int(len(nvme_list) / len(threads)) 1338 remainder = len(nvme_list) % len(threads) 1339 iterator = iter(nvme_list) 1340 result = [] 1341 for i in range(len(threads)): 1342 result.append([]) 1343 for _ in range(nvme_per_split): 1344 result[i].append(next(iterator)) 1345 if remainder: 1346 result[i].append(next(iterator)) 1347 remainder -= 1 1348 for i, r in enumerate(result): 1349 header = "[filename%s]" % i 1350 disks = "\n".join(["filename=%s" % x for x in r]) 1351 job_section_qd = round((io_depth * len(r)) / num_jobs) 1352 if job_section_qd == 0: 1353 job_section_qd = 1 1354 iodepth = "iodepth=%s" % job_section_qd 1355 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1356 1357 return filename_section 1358 1359 1360class SPDKInitiator(Initiator): 1361 def __init__(self, name, general_config, initiator_config): 1362 super().__init__(name, general_config, initiator_config) 1363 1364 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1365 self.install_spdk() 1366 1367 # Required fields 1368 self.num_cores = initiator_config["num_cores"] 1369 1370 # Optional fields 1371 self.enable_data_digest = False 1372 if "enable_data_digest" in initiator_config: 1373 self.enable_data_digest = initiator_config["enable_data_digest"] 1374 1375 def install_spdk(self): 1376 self.log_print("Using fio binary %s" % self.fio_bin) 1377 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1378 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1379 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1380 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1381 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1382 1383 self.log_print("SPDK built") 1384 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1385 1386 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1387 bdev_cfg_section = { 1388 "subsystems": [ 1389 { 1390 "subsystem": "bdev", 1391 "config": [] 1392 } 1393 ] 1394 } 1395 1396 for i, subsys in enumerate(remote_subsystem_list): 1397 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1398 nvme_ctrl = { 1399 "method": "bdev_nvme_attach_controller", 1400 "params": { 1401 "name": "Nvme{}".format(i), 1402 "trtype": self.transport, 1403 "traddr": sub_addr, 1404 "trsvcid": sub_port, 1405 "subnqn": sub_nqn, 1406 "adrfam": "IPv4" 1407 } 1408 } 1409 1410 if self.enable_adq: 1411 nvme_ctrl["params"].update({"priority": "1"}) 1412 1413 if self.enable_data_digest: 1414 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1415 1416 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1417 1418 return json.dumps(bdev_cfg_section, indent=2) 1419 1420 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1421 filename_section = "" 1422 if len(threads) >= len(subsystems): 1423 threads = range(0, len(subsystems)) 1424 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1425 nvme_per_split = int(len(subsystems) / len(threads)) 1426 remainder = len(subsystems) % len(threads) 1427 iterator = iter(filenames) 1428 result = [] 1429 for i in range(len(threads)): 1430 result.append([]) 1431 for _ in range(nvme_per_split): 1432 result[i].append(next(iterator)) 1433 if remainder: 1434 result[i].append(next(iterator)) 1435 remainder -= 1 1436 for i, r in enumerate(result): 1437 header = "[filename%s]" % i 1438 disks = "\n".join(["filename=%s" % x for x in r]) 1439 job_section_qd = round((io_depth * len(r)) / num_jobs) 1440 if job_section_qd == 0: 1441 job_section_qd = 1 1442 iodepth = "iodepth=%s" % job_section_qd 1443 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1444 1445 return filename_section 1446 1447 1448if __name__ == "__main__": 1449 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1450 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1451 1452 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1453 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1454 help='Configuration file.') 1455 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1456 help='Results directory.') 1457 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1458 help='CSV results filename.') 1459 1460 args = parser.parse_args() 1461 1462 print("Using config file: %s" % args.config) 1463 with open(args.config, "r") as config: 1464 data = json.load(config) 1465 1466 initiators = [] 1467 fio_cases = [] 1468 1469 general_config = data["general"] 1470 target_config = data["target"] 1471 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1472 1473 for k, v in data.items(): 1474 if "target" in k: 1475 v.update({"results_dir": args.results}) 1476 if data[k]["mode"] == "spdk": 1477 target_obj = SPDKTarget(k, data["general"], v) 1478 elif data[k]["mode"] == "kernel": 1479 target_obj = KernelTarget(k, data["general"], v) 1480 elif "initiator" in k: 1481 if data[k]["mode"] == "spdk": 1482 init_obj = SPDKInitiator(k, data["general"], v) 1483 elif data[k]["mode"] == "kernel": 1484 init_obj = KernelInitiator(k, data["general"], v) 1485 initiators.append(init_obj) 1486 elif "fio" in k: 1487 fio_workloads = itertools.product(data[k]["bs"], 1488 data[k]["qd"], 1489 data[k]["rw"]) 1490 1491 fio_run_time = data[k]["run_time"] 1492 fio_ramp_time = data[k]["ramp_time"] 1493 fio_rw_mix_read = data[k]["rwmixread"] 1494 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1495 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1496 1497 fio_rate_iops = 0 1498 if "rate_iops" in data[k]: 1499 fio_rate_iops = data[k]["rate_iops"] 1500 else: 1501 continue 1502 1503 try: 1504 os.mkdir(args.results) 1505 except FileExistsError: 1506 pass 1507 1508 # TODO: This try block is definietly too large. Need to break this up into separate 1509 # logical blocks to reduce size. 1510 try: 1511 target_obj.tgt_start() 1512 1513 for i in initiators: 1514 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1515 if i.enable_adq: 1516 i.adq_configure_tc() 1517 1518 # Poor mans threading 1519 # Run FIO tests 1520 for block_size, io_depth, rw in fio_workloads: 1521 threads = [] 1522 configs = [] 1523 for i in initiators: 1524 if i.mode == "kernel": 1525 i.kernel_init_connect() 1526 1527 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1528 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1529 configs.append(cfg) 1530 1531 for i, cfg in zip(initiators, configs): 1532 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1533 threads.append(t) 1534 if target_obj.enable_sar: 1535 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1536 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1537 threads.append(t) 1538 1539 if target_obj.enable_pcm: 1540 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1541 1542 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1543 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1544 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1545 1546 threads.append(pcm_cpu_t) 1547 threads.append(pcm_mem_t) 1548 threads.append(pcm_pow_t) 1549 1550 if target_obj.enable_bandwidth: 1551 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1552 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1553 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1554 threads.append(t) 1555 1556 if target_obj.enable_dpdk_memory: 1557 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1558 threads.append(t) 1559 1560 if target_obj.enable_adq: 1561 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1562 threads.append(ethtool_thread) 1563 1564 for t in threads: 1565 t.start() 1566 for t in threads: 1567 t.join() 1568 1569 for i in initiators: 1570 if i.mode == "kernel": 1571 i.kernel_init_disconnect() 1572 i.copy_result_files(args.results) 1573 1574 target_obj.restore_governor() 1575 target_obj.restore_tuned() 1576 target_obj.restore_services() 1577 target_obj.restore_sysctl() 1578 if target_obj.enable_adq: 1579 target_obj.reload_driver("ice") 1580 for i in initiators: 1581 i.restore_governor() 1582 i.restore_tuned() 1583 i.restore_services() 1584 i.restore_sysctl() 1585 if i.enable_adq: 1586 i.reload_driver("ice") 1587 target_obj.parse_results(args.results, args.csv_filename) 1588 finally: 1589 for i in initiators: 1590 try: 1591 i.stop() 1592 except Exception as err: 1593 pass 1594 target_obj.stop() 1595