1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 # pylint: disable=R0201 97 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 98 return "" 99 100 def configure_system(self): 101 self.configure_services() 102 self.configure_sysctl() 103 self.configure_tuned() 104 self.configure_cpu_governor() 105 self.configure_irq_affinity() 106 107 def configure_adq(self): 108 if self.mode == "kernel": 109 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 110 return 111 self.adq_load_modules() 112 self.adq_configure_nic() 113 114 def adq_load_modules(self): 115 self.log_print("Modprobing ADQ-related Linux modules...") 116 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 117 for module in adq_module_deps: 118 try: 119 self.exec_cmd(["sudo", "modprobe", module]) 120 self.log_print("%s loaded!" % module) 121 except CalledProcessError as e: 122 self.log_print("ERROR: failed to load module %s" % module) 123 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 124 125 def adq_configure_tc(self): 126 self.log_print("Configuring ADQ Traffic classes and filters...") 127 128 if self.mode == "kernel": 129 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 130 return 131 132 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 133 num_queues_tc1 = self.num_cores 134 port_param = "dst_port" if isinstance(self, Target) else "src_port" 135 port = "4420" 136 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 137 138 for nic_ip in self.nic_ips: 139 nic_name = self.get_nic_name_by_ip(nic_ip) 140 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 141 "root", "mqprio", "num_tc", "2", "map", "0", "1", 142 "queues", "%s@0" % num_queues_tc0, 143 "%s@%s" % (num_queues_tc1, num_queues_tc0), 144 "hw", "1", "mode", "channel"] 145 self.log_print(" ".join(tc_qdisc_map_cmd)) 146 self.exec_cmd(tc_qdisc_map_cmd) 147 148 time.sleep(5) 149 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 150 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 151 self.exec_cmd(tc_qdisc_ingress_cmd) 152 153 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 154 "protocol", "ip", "ingress", "prio", "1", "flower", 155 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 156 "skip_sw", "hw_tc", "1"] 157 self.log_print(" ".join(tc_filter_cmd)) 158 self.exec_cmd(tc_filter_cmd) 159 160 # show tc configuration 161 self.log_print("Show tc configuration for %s NIC..." % nic_name) 162 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 163 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 164 self.log_print("%s" % tc_disk_out) 165 self.log_print("%s" % tc_filter_out) 166 167 # Ethtool coalesce settings must be applied after configuring traffic classes 168 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 169 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 170 171 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 172 xps_cmd = ["sudo", xps_script_path, nic_name] 173 self.log_print(xps_cmd) 174 self.exec_cmd(xps_cmd) 175 176 def reload_driver(self, driver): 177 try: 178 self.exec_cmd(["sudo", "rmmod", driver]) 179 self.exec_cmd(["sudo", "modprobe", driver]) 180 except CalledProcessError as e: 181 self.log_print("ERROR: failed to reload %s module!" % driver) 182 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 183 184 def adq_configure_nic(self): 185 self.log_print("Configuring NIC port settings for ADQ testing...") 186 187 # Reload the driver first, to make sure any previous settings are re-set. 188 self.reload_driver("ice") 189 190 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 191 for nic in nic_names: 192 self.log_print(nic) 193 try: 194 self.exec_cmd(["sudo", "ethtool", "-K", nic, 195 "hw-tc-offload", "on"]) # Enable hardware TC offload 196 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 197 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 198 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 199 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 200 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 201 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 202 "channel-pkt-inspect-optimize", "on"]) 203 except CalledProcessError as e: 204 self.log_print("ERROR: failed to configure NIC port using ethtool!") 205 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 206 self.log_print("Please update your NIC driver and firmware versions and try again.") 207 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 208 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 209 210 def configure_services(self): 211 self.log_print("Configuring active services...") 212 svc_config = configparser.ConfigParser(strict=False) 213 214 # Below list is valid only for RHEL / Fedora systems and might not 215 # contain valid names for other distributions. 216 svc_target_state = { 217 "firewalld": "inactive", 218 "irqbalance": "inactive", 219 "lldpad.service": "inactive", 220 "lldpad.socket": "inactive" 221 } 222 223 for service in svc_target_state: 224 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 225 out = "\n".join(["[%s]" % service, out]) 226 svc_config.read_string(out) 227 228 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 229 continue 230 231 service_state = svc_config[service]["ActiveState"] 232 self.log_print("Current state of %s service is %s" % (service, service_state)) 233 self.svc_restore_dict.update({service: service_state}) 234 if service_state != "inactive": 235 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 236 self.exec_cmd(["sudo", "systemctl", "stop", service]) 237 238 def configure_sysctl(self): 239 self.log_print("Tuning sysctl settings...") 240 241 busy_read = 0 242 if self.enable_adq and self.mode == "spdk": 243 busy_read = 1 244 245 sysctl_opts = { 246 "net.core.busy_poll": 0, 247 "net.core.busy_read": busy_read, 248 "net.core.somaxconn": 4096, 249 "net.core.netdev_max_backlog": 8192, 250 "net.ipv4.tcp_max_syn_backlog": 16384, 251 "net.core.rmem_max": 268435456, 252 "net.core.wmem_max": 268435456, 253 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 254 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 255 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 256 "net.ipv4.route.flush": 1, 257 "vm.overcommit_memory": 1, 258 } 259 260 for opt, value in sysctl_opts.items(): 261 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 262 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 263 264 def configure_tuned(self): 265 if not self.tuned_profile: 266 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 267 return 268 269 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 270 service = "tuned" 271 tuned_config = configparser.ConfigParser(strict=False) 272 273 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 274 out = "\n".join(["[%s]" % service, out]) 275 tuned_config.read_string(out) 276 tuned_state = tuned_config[service]["ActiveState"] 277 self.svc_restore_dict.update({service: tuned_state}) 278 279 if tuned_state != "inactive": 280 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 281 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 282 283 self.tuned_restore_dict = { 284 "profile": profile, 285 "mode": profile_mode 286 } 287 288 self.exec_cmd(["sudo", "systemctl", "start", service]) 289 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 290 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 291 292 def configure_cpu_governor(self): 293 self.log_print("Setting CPU governor to performance...") 294 295 # This assumes that there is the same CPU scaling governor on each CPU 296 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 297 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 298 299 def configure_irq_affinity(self): 300 self.log_print("Setting NIC irq affinity for NICs...") 301 302 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 303 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 304 for nic in nic_names: 305 irq_cmd = ["sudo", irq_script_path, nic] 306 self.log_print(irq_cmd) 307 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 308 309 def restore_services(self): 310 self.log_print("Restoring services...") 311 for service, state in self.svc_restore_dict.items(): 312 cmd = "stop" if state == "inactive" else "start" 313 self.exec_cmd(["sudo", "systemctl", cmd, service]) 314 315 def restore_sysctl(self): 316 self.log_print("Restoring sysctl settings...") 317 for opt, value in self.sysctl_restore_dict.items(): 318 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 319 320 def restore_tuned(self): 321 self.log_print("Restoring tuned-adm settings...") 322 323 if not self.tuned_restore_dict: 324 return 325 326 if self.tuned_restore_dict["mode"] == "auto": 327 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 328 self.log_print("Reverted tuned-adm to auto_profile.") 329 else: 330 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 331 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 332 333 def restore_governor(self): 334 self.log_print("Restoring CPU governor setting...") 335 if self.governor_restore: 336 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 337 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 338 339 340class Target(Server): 341 def __init__(self, name, general_config, target_config): 342 super().__init__(name, general_config, target_config) 343 344 # Defaults 345 self.enable_sar = False 346 self.sar_delay = 0 347 self.sar_interval = 0 348 self.sar_count = 0 349 self.enable_pcm = False 350 self.pcm_dir = "" 351 self.pcm_delay = 0 352 self.pcm_interval = 0 353 self.pcm_count = 0 354 self.enable_bandwidth = 0 355 self.bandwidth_count = 0 356 self.enable_dpdk_memory = False 357 self.dpdk_wait_time = 0 358 self.enable_zcopy = False 359 self.scheduler_name = "static" 360 self.null_block = 0 361 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 362 self.subsystem_info_list = [] 363 self.initiator_info = [] 364 365 if "null_block_devices" in target_config: 366 self.null_block = target_config["null_block_devices"] 367 if "sar_settings" in target_config: 368 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 369 if "pcm_settings" in target_config: 370 self.enable_pcm = True 371 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 372 if "enable_bandwidth" in target_config: 373 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 374 if "enable_dpdk_memory" in target_config: 375 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 376 if "scheduler_settings" in target_config: 377 self.scheduler_name = target_config["scheduler_settings"] 378 if "zcopy_settings" in target_config: 379 self.enable_zcopy = target_config["zcopy_settings"] 380 if "results_dir" in target_config: 381 self.results_dir = target_config["results_dir"] 382 383 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 384 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 385 self.set_local_nic_info(self.set_local_nic_info_helper()) 386 387 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 388 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 389 390 self.configure_system() 391 if self.enable_adq: 392 self.configure_adq() 393 self.sys_config() 394 395 def set_local_nic_info_helper(self): 396 return json.loads(self.exec_cmd(["lshw", "-json"])) 397 398 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 399 stderr_opt = None 400 if stderr_redirect: 401 stderr_opt = subprocess.STDOUT 402 if change_dir: 403 old_cwd = os.getcwd() 404 os.chdir(change_dir) 405 self.log_print("Changing directory to %s" % change_dir) 406 407 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 408 409 if change_dir: 410 os.chdir(old_cwd) 411 self.log_print("Changing directory to %s" % old_cwd) 412 return out 413 414 def zip_spdk_sources(self, spdk_dir, dest_file): 415 self.log_print("Zipping SPDK source directory") 416 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 417 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 418 for file in files: 419 fh.write(os.path.relpath(os.path.join(root, file))) 420 fh.close() 421 self.log_print("Done zipping") 422 423 @staticmethod 424 def _chunks(input_list, chunks_no): 425 div, rem = divmod(len(input_list), chunks_no) 426 for i in range(chunks_no): 427 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 428 yield input_list[si:si + (div + 1 if i < rem else div)] 429 430 def spread_bdevs(self, req_disks): 431 # Spread available block devices indexes: 432 # - evenly across available initiator systems 433 # - evenly across available NIC interfaces for 434 # each initiator 435 # Not NUMA aware. 436 ip_bdev_map = [] 437 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 438 439 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 440 self.initiator_info[i]["bdev_range"] = init_chunk 441 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 442 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 443 for c in nic_chunk: 444 ip_bdev_map.append((ip, c)) 445 return ip_bdev_map 446 447 @staticmethod 448 def read_json_stats(file): 449 with open(file, "r") as json_data: 450 data = json.load(json_data) 451 job_pos = 0 # job_post = 0 because using aggregated results 452 453 # Check if latency is in nano or microseconds to choose correct dict key 454 def get_lat_unit(key_prefix, dict_section): 455 # key prefix - lat, clat or slat. 456 # dict section - portion of json containing latency bucket in question 457 # Return dict key to access the bucket and unit as string 458 for k, _ in dict_section.items(): 459 if k.startswith(key_prefix): 460 return k, k.split("_")[1] 461 462 def get_clat_percentiles(clat_dict_leaf): 463 if "percentile" in clat_dict_leaf: 464 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 465 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 466 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 467 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 468 469 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 470 else: 471 # Latest fio versions do not provide "percentile" results if no 472 # measurements were done, so just return zeroes 473 return [0, 0, 0, 0] 474 475 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 476 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 477 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 478 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 479 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 480 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 481 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 482 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 483 data["jobs"][job_pos]["read"][clat_key]) 484 485 if "ns" in lat_unit: 486 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 487 if "ns" in clat_unit: 488 read_p99_lat = read_p99_lat / 1000 489 read_p99_9_lat = read_p99_9_lat / 1000 490 read_p99_99_lat = read_p99_99_lat / 1000 491 read_p99_999_lat = read_p99_999_lat / 1000 492 493 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 494 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 495 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 496 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 497 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 498 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 499 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 500 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 501 data["jobs"][job_pos]["write"][clat_key]) 502 503 if "ns" in lat_unit: 504 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 505 if "ns" in clat_unit: 506 write_p99_lat = write_p99_lat / 1000 507 write_p99_9_lat = write_p99_9_lat / 1000 508 write_p99_99_lat = write_p99_99_lat / 1000 509 write_p99_999_lat = write_p99_999_lat / 1000 510 511 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 512 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 513 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 514 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 515 516 def parse_results(self, results_dir, csv_file): 517 files = os.listdir(results_dir) 518 fio_files = filter(lambda x: ".fio" in x, files) 519 json_files = [x for x in files if ".json" in x] 520 521 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 522 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 523 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 524 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 525 526 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 527 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 528 529 header_line = ",".join(["Name", *headers]) 530 aggr_header_line = ",".join(["Name", *aggr_headers]) 531 532 # Create empty results file 533 with open(os.path.join(results_dir, csv_file), "w") as fh: 534 fh.write(aggr_header_line + "\n") 535 rows = set() 536 537 for fio_config in fio_files: 538 self.log_print("Getting FIO stats for %s" % fio_config) 539 job_name, _ = os.path.splitext(fio_config) 540 541 # Look in the filename for rwmixread value. Function arguments do 542 # not have that information. 543 # TODO: Improve this function by directly using workload params instead 544 # of regexing through filenames. 545 if "read" in job_name: 546 rw_mixread = 1 547 elif "write" in job_name: 548 rw_mixread = 0 549 else: 550 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 551 552 # If "_CPU" exists in name - ignore it 553 # Initiators for the same job could have different num_cores parameter 554 job_name = re.sub(r"_\d+CPU", "", job_name) 555 job_result_files = [x for x in json_files if x.startswith(job_name)] 556 self.log_print("Matching result files for current fio config:") 557 for j in job_result_files: 558 self.log_print("\t %s" % j) 559 560 # There may have been more than 1 initiator used in test, need to check that 561 # Result files are created so that string after last "_" separator is server name 562 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 563 inits_avg_results = [] 564 for i in inits_names: 565 self.log_print("\tGetting stats for initiator %s" % i) 566 # There may have been more than 1 test run for this job, calculate average results for initiator 567 i_results = [x for x in job_result_files if i in x] 568 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 569 570 separate_stats = [] 571 for r in i_results: 572 try: 573 stats = self.read_json_stats(os.path.join(results_dir, r)) 574 separate_stats.append(stats) 575 self.log_print(stats) 576 except JSONDecodeError: 577 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 578 579 init_results = [sum(x) for x in zip(*separate_stats)] 580 init_results = [x / len(separate_stats) for x in init_results] 581 inits_avg_results.append(init_results) 582 583 self.log_print("\tAverage results for initiator %s" % i) 584 self.log_print(init_results) 585 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 586 fh.write(header_line + "\n") 587 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 588 589 # Sum results of all initiators running this FIO job. 590 # Latency results are an average of latencies from accros all initiators. 591 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 592 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 593 for key in inits_avg_results: 594 if "lat" in key: 595 inits_avg_results[key] /= len(inits_names) 596 597 # Aggregate separate read/write values into common labels 598 # Take rw_mixread into consideration for mixed read/write workloads. 599 aggregate_results = OrderedDict() 600 for h in aggr_headers: 601 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 602 if "lat" in h: 603 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 604 else: 605 _ = read_stat + write_stat 606 aggregate_results[h] = "{0:.3f}".format(_) 607 608 rows.add(",".join([job_name, *aggregate_results.values()])) 609 610 # Save results to file 611 for row in rows: 612 with open(os.path.join(results_dir, csv_file), "a") as fh: 613 fh.write(row + "\n") 614 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 615 616 def measure_sar(self, results_dir, sar_file_prefix): 617 cpu_number = os.cpu_count() 618 sar_idle_sum = 0 619 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 620 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 621 622 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 623 time.sleep(self.sar_delay) 624 self.log_print("Starting SAR measurements") 625 626 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 627 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 628 for line in out.split("\n"): 629 if "Average" in line: 630 if "CPU" in line: 631 self.log_print("Summary CPU utilization from SAR:") 632 self.log_print(line) 633 elif "all" in line: 634 self.log_print(line) 635 else: 636 sar_idle_sum += float(line.split()[7]) 637 fh.write(out) 638 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 639 640 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 641 f.write("%0.2f" % sar_cpu_usage) 642 643 def ethtool_after_fio_ramp(self, fio_ramp_time): 644 time.sleep(fio_ramp_time//2) 645 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 646 for nic in nic_names: 647 self.log_print(nic) 648 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 649 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 650 651 def measure_pcm_memory(self, results_dir, pcm_file_name): 652 time.sleep(self.pcm_delay) 653 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 654 pcm_memory = subprocess.Popen(cmd) 655 time.sleep(self.pcm_count) 656 pcm_memory.terminate() 657 658 def measure_pcm(self, results_dir, pcm_file_name): 659 time.sleep(self.pcm_delay) 660 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 661 subprocess.run(cmd) 662 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 663 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 664 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 665 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 666 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 667 668 def measure_pcm_power(self, results_dir, pcm_power_file_name): 669 time.sleep(self.pcm_delay) 670 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 671 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 672 fh.write(out) 673 674 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 675 self.log_print("INFO: starting network bandwidth measure") 676 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 677 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 678 679 def measure_dpdk_memory(self, results_dir): 680 self.log_print("INFO: waiting to generate DPDK memory usage") 681 time.sleep(self.dpdk_wait_time) 682 self.log_print("INFO: generating DPDK memory usage") 683 rpc.env.env_dpdk_get_mem_stats 684 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 685 686 def sys_config(self): 687 self.log_print("====Kernel release:====") 688 self.log_print(os.uname().release) 689 self.log_print("====Kernel command line:====") 690 with open('/proc/cmdline') as f: 691 cmdline = f.readlines() 692 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 693 self.log_print("====sysctl conf:====") 694 with open('/etc/sysctl.conf') as f: 695 sysctl = f.readlines() 696 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 697 self.log_print("====Cpu power info:====") 698 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 699 self.log_print("====zcopy settings:====") 700 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 701 self.log_print("====Scheduler settings:====") 702 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 703 704 705class Initiator(Server): 706 def __init__(self, name, general_config, initiator_config): 707 super().__init__(name, general_config, initiator_config) 708 709 # Required fields 710 self.ip = initiator_config["ip"] 711 self.target_nic_ips = initiator_config["target_nic_ips"] 712 713 # Defaults 714 self.cpus_allowed = None 715 self.cpus_allowed_policy = "shared" 716 self.spdk_dir = "/tmp/spdk" 717 self.fio_bin = "/usr/src/fio/fio" 718 self.nvmecli_bin = "nvme" 719 self.cpu_frequency = None 720 self.subsystem_info_list = [] 721 722 if "spdk_dir" in initiator_config: 723 self.spdk_dir = initiator_config["spdk_dir"] 724 if "fio_bin" in initiator_config: 725 self.fio_bin = initiator_config["fio_bin"] 726 if "nvmecli_bin" in initiator_config: 727 self.nvmecli_bin = initiator_config["nvmecli_bin"] 728 if "cpus_allowed" in initiator_config: 729 self.cpus_allowed = initiator_config["cpus_allowed"] 730 if "cpus_allowed_policy" in initiator_config: 731 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 732 if "cpu_frequency" in initiator_config: 733 self.cpu_frequency = initiator_config["cpu_frequency"] 734 if os.getenv('SPDK_WORKSPACE'): 735 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 736 737 self.ssh_connection = paramiko.SSHClient() 738 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 739 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 740 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 741 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 742 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 743 744 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 745 self.copy_spdk("/tmp/spdk.zip") 746 self.set_local_nic_info(self.set_local_nic_info_helper()) 747 self.set_cpu_frequency() 748 self.configure_system() 749 if self.enable_adq: 750 self.configure_adq() 751 self.sys_config() 752 753 def set_local_nic_info_helper(self): 754 return json.loads(self.exec_cmd(["lshw", "-json"])) 755 756 def stop(self): 757 self.ssh_connection.close() 758 759 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 760 if change_dir: 761 cmd = ["cd", change_dir, ";", *cmd] 762 763 # In case one of the command elements contains whitespace and is not 764 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 765 # when sending to remote system. 766 for i, c in enumerate(cmd): 767 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 768 cmd[i] = '"%s"' % c 769 cmd = " ".join(cmd) 770 771 # Redirect stderr to stdout thanks using get_pty option if needed 772 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 773 out = stdout.read().decode(encoding="utf-8") 774 775 # Check the return code 776 rc = stdout.channel.recv_exit_status() 777 if rc: 778 raise CalledProcessError(int(rc), cmd, out) 779 780 return out 781 782 def put_file(self, local, remote_dest): 783 ftp = self.ssh_connection.open_sftp() 784 ftp.put(local, remote_dest) 785 ftp.close() 786 787 def get_file(self, remote, local_dest): 788 ftp = self.ssh_connection.open_sftp() 789 ftp.get(remote, local_dest) 790 ftp.close() 791 792 def copy_spdk(self, local_spdk_zip): 793 self.log_print("Copying SPDK sources to initiator %s" % self.name) 794 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 795 self.log_print("Copied sources zip from target") 796 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 797 self.log_print("Sources unpacked") 798 799 def copy_result_files(self, dest_dir): 800 self.log_print("Copying results") 801 802 if not os.path.exists(dest_dir): 803 os.mkdir(dest_dir) 804 805 # Get list of result files from initiator and copy them back to target 806 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 807 808 for file in file_list: 809 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 810 os.path.join(dest_dir, file)) 811 self.log_print("Done copying results") 812 813 def discover_subsystems(self, address_list, subsys_no): 814 num_nvmes = range(0, subsys_no) 815 nvme_discover_output = "" 816 for ip, subsys_no in itertools.product(address_list, num_nvmes): 817 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 818 nvme_discover_cmd = ["sudo", 819 "%s" % self.nvmecli_bin, 820 "discover", "-t", "%s" % self.transport, 821 "-s", "%s" % (4420 + subsys_no), 822 "-a", "%s" % ip] 823 824 try: 825 stdout = self.exec_cmd(nvme_discover_cmd) 826 if stdout: 827 nvme_discover_output = nvme_discover_output + stdout 828 except CalledProcessError: 829 # Do nothing. In case of discovering remote subsystems of kernel target 830 # we expect "nvme discover" to fail a bunch of times because we basically 831 # scan ports. 832 pass 833 834 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 835 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 836 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 837 nvme_discover_output) # from nvme discovery output 838 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 839 subsystems = list(set(subsystems)) 840 subsystems.sort(key=lambda x: x[1]) 841 self.log_print("Found matching subsystems on target side:") 842 for s in subsystems: 843 self.log_print(s) 844 self.subsystem_info_list = subsystems 845 846 def gen_fio_filename_conf(self, *args, **kwargs): 847 # Logic implemented in SPDKInitiator and KernelInitiator classes 848 pass 849 850 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 851 fio_conf_template = """ 852[global] 853ioengine={ioengine} 854{spdk_conf} 855thread=1 856group_reporting=1 857direct=1 858percentile_list=50:90:99:99.5:99.9:99.99:99.999 859 860norandommap=1 861rw={rw} 862rwmixread={rwmixread} 863bs={block_size} 864time_based=1 865ramp_time={ramp_time} 866runtime={run_time} 867rate_iops={rate_iops} 868""" 869 if "spdk" in self.mode: 870 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 871 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 872 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 873 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 874 else: 875 ioengine = self.ioengine 876 spdk_conf = "" 877 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 878 "|", "awk", "'{print $1}'"]) 879 subsystems = [x for x in out.split("\n") if "nvme" in x] 880 881 if self.cpus_allowed is not None: 882 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 883 cpus_num = 0 884 cpus = self.cpus_allowed.split(",") 885 for cpu in cpus: 886 if "-" in cpu: 887 a, b = cpu.split("-") 888 a = int(a) 889 b = int(b) 890 cpus_num += len(range(a, b)) 891 else: 892 cpus_num += 1 893 self.num_cores = cpus_num 894 threads = range(0, self.num_cores) 895 elif hasattr(self, 'num_cores'): 896 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 897 threads = range(0, int(self.num_cores)) 898 else: 899 self.num_cores = len(subsystems) 900 threads = range(0, len(subsystems)) 901 902 if "spdk" in self.mode: 903 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 904 else: 905 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 906 907 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 908 rw=rw, rwmixread=rwmixread, block_size=block_size, 909 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 910 911 # TODO: hipri disabled for now, as it causes fio errors: 912 # io_u error on file /dev/nvme2n1: Operation not supported 913 # See comment in KernelInitiator class, kernel_init_connect() function 914 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 915 fio_config = fio_config + """ 916fixedbufs=1 917registerfiles=1 918#hipri=1 919""" 920 if num_jobs: 921 fio_config = fio_config + "numjobs=%s \n" % num_jobs 922 if self.cpus_allowed is not None: 923 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 924 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 925 fio_config = fio_config + filename_section 926 927 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 928 if hasattr(self, "num_cores"): 929 fio_config_filename += "_%sCPU" % self.num_cores 930 fio_config_filename += ".fio" 931 932 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 933 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 934 self.log_print("Created FIO Config:") 935 self.log_print(fio_config) 936 937 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 938 939 def set_cpu_frequency(self): 940 if self.cpu_frequency is not None: 941 try: 942 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 943 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 944 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 945 except Exception: 946 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 947 sys.exit() 948 else: 949 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 950 951 def run_fio(self, fio_config_file, run_num=None): 952 job_name, _ = os.path.splitext(fio_config_file) 953 self.log_print("Starting FIO run for job: %s" % job_name) 954 self.log_print("Using FIO: %s" % self.fio_bin) 955 956 if run_num: 957 for i in range(1, run_num + 1): 958 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 959 try: 960 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 961 "--output=%s" % output_filename, "--eta=never"], True) 962 self.log_print(output) 963 except subprocess.CalledProcessError as e: 964 self.log_print("ERROR: Fio process failed!") 965 self.log_print(e.stdout) 966 else: 967 output_filename = job_name + "_" + self.name + ".json" 968 output = self.exec_cmd(["sudo", self.fio_bin, 969 fio_config_file, "--output-format=json", 970 "--output" % output_filename], True) 971 self.log_print(output) 972 self.log_print("FIO run finished. Results in: %s" % output_filename) 973 974 def sys_config(self): 975 self.log_print("====Kernel release:====") 976 self.log_print(self.exec_cmd(["uname", "-r"])) 977 self.log_print("====Kernel command line:====") 978 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 979 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 980 self.log_print("====sysctl conf:====") 981 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 982 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 983 self.log_print("====Cpu power info:====") 984 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 985 986 987class KernelTarget(Target): 988 def __init__(self, name, general_config, target_config): 989 super().__init__(name, general_config, target_config) 990 # Defaults 991 self.nvmet_bin = "nvmetcli" 992 993 if "nvmet_bin" in target_config: 994 self.nvmet_bin = target_config["nvmet_bin"] 995 996 def stop(self): 997 nvmet_command(self.nvmet_bin, "clear") 998 999 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1000 1001 nvmet_cfg = { 1002 "ports": [], 1003 "hosts": [], 1004 "subsystems": [], 1005 } 1006 1007 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1008 port = str(4420 + bdev_num) 1009 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1010 serial = "SPDK00%s" % bdev_num 1011 bdev_name = nvme_list[bdev_num] 1012 1013 nvmet_cfg["subsystems"].append({ 1014 "allowed_hosts": [], 1015 "attr": { 1016 "allow_any_host": "1", 1017 "serial": serial, 1018 "version": "1.3" 1019 }, 1020 "namespaces": [ 1021 { 1022 "device": { 1023 "path": bdev_name, 1024 "uuid": "%s" % uuid.uuid4() 1025 }, 1026 "enable": 1, 1027 "nsid": port 1028 } 1029 ], 1030 "nqn": nqn 1031 }) 1032 1033 nvmet_cfg["ports"].append({ 1034 "addr": { 1035 "adrfam": "ipv4", 1036 "traddr": ip, 1037 "trsvcid": port, 1038 "trtype": self.transport 1039 }, 1040 "portid": bdev_num, 1041 "referrals": [], 1042 "subsystems": [nqn] 1043 }) 1044 1045 self.subsystem_info_list.append([port, nqn, ip]) 1046 self.subsys_no = len(self.subsystem_info_list) 1047 1048 with open("kernel.conf", "w") as fh: 1049 fh.write(json.dumps(nvmet_cfg, indent=2)) 1050 1051 def tgt_start(self): 1052 self.log_print("Configuring kernel NVMeOF Target") 1053 1054 if self.null_block: 1055 print("Configuring with null block device.") 1056 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1057 else: 1058 print("Configuring with NVMe drives.") 1059 nvme_list = get_nvme_devices() 1060 1061 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1062 self.subsys_no = len(nvme_list) 1063 1064 nvmet_command(self.nvmet_bin, "clear") 1065 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1066 1067 if self.enable_adq: 1068 self.adq_configure_tc() 1069 1070 self.log_print("Done configuring kernel NVMeOF Target") 1071 1072 1073class SPDKTarget(Target): 1074 def __init__(self, name, general_config, target_config): 1075 super().__init__(name, general_config, target_config) 1076 1077 # Required fields 1078 self.core_mask = target_config["core_mask"] 1079 self.num_cores = self.get_num_cores(self.core_mask) 1080 1081 # Defaults 1082 self.dif_insert_strip = False 1083 self.null_block_dif_type = 0 1084 self.num_shared_buffers = 4096 1085 self.max_queue_depth = 128 1086 self.bpf_proc = None 1087 self.bpf_scripts = [] 1088 self.enable_idxd = False 1089 1090 if "num_shared_buffers" in target_config: 1091 self.num_shared_buffers = target_config["num_shared_buffers"] 1092 if "max_queue_depth" in target_config: 1093 self.max_queue_depth = target_config["max_queue_depth"] 1094 if "null_block_dif_type" in target_config: 1095 self.null_block_dif_type = target_config["null_block_dif_type"] 1096 if "dif_insert_strip" in target_config: 1097 self.dif_insert_strip = target_config["dif_insert_strip"] 1098 if "bpf_scripts" in target_config: 1099 self.bpf_scripts = target_config["bpf_scripts"] 1100 if "idxd_settings" in target_config: 1101 self.enable_idxd = target_config["idxd_settings"] 1102 1103 self.log_print("====IDXD settings:====") 1104 self.log_print("IDXD enabled: %s" % (self.enable_idxd)) 1105 1106 @staticmethod 1107 def get_num_cores(core_mask): 1108 if "0x" in core_mask: 1109 return bin(int(core_mask, 16)).count("1") 1110 else: 1111 num_cores = 0 1112 core_mask = core_mask.replace("[", "") 1113 core_mask = core_mask.replace("]", "") 1114 for i in core_mask.split(","): 1115 if "-" in i: 1116 x, y = i.split("-") 1117 num_cores += len(range(int(x), int(y))) + 1 1118 else: 1119 num_cores += 1 1120 return num_cores 1121 1122 def spdk_tgt_configure(self): 1123 self.log_print("Configuring SPDK NVMeOF target via RPC") 1124 1125 if self.enable_adq: 1126 self.adq_configure_tc() 1127 1128 # Create transport layer 1129 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1130 num_shared_buffers=self.num_shared_buffers, 1131 max_queue_depth=self.max_queue_depth, 1132 dif_insert_or_strip=self.dif_insert_strip, 1133 sock_priority=self.adq_priority) 1134 self.log_print("SPDK NVMeOF transport layer:") 1135 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1136 1137 if self.null_block: 1138 self.spdk_tgt_add_nullblock(self.null_block) 1139 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1140 else: 1141 self.spdk_tgt_add_nvme_conf() 1142 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1143 1144 self.log_print("Done configuring SPDK NVMeOF Target") 1145 1146 def spdk_tgt_add_nullblock(self, null_block_count): 1147 md_size = 0 1148 block_size = 4096 1149 if self.null_block_dif_type != 0: 1150 md_size = 128 1151 1152 self.log_print("Adding null block bdevices to config via RPC") 1153 for i in range(null_block_count): 1154 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1155 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1156 dif_type=self.null_block_dif_type, md_size=md_size) 1157 self.log_print("SPDK Bdevs configuration:") 1158 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1159 1160 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1161 self.log_print("Adding NVMe bdevs to config via RPC") 1162 1163 bdfs = get_nvme_devices_bdf() 1164 bdfs = [b.replace(":", ".") for b in bdfs] 1165 1166 if req_num_disks: 1167 if req_num_disks > len(bdfs): 1168 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1169 sys.exit(1) 1170 else: 1171 bdfs = bdfs[0:req_num_disks] 1172 1173 for i, bdf in enumerate(bdfs): 1174 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1175 1176 self.log_print("SPDK Bdevs configuration:") 1177 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1178 1179 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1180 self.log_print("Adding subsystems to config") 1181 if not req_num_disks: 1182 req_num_disks = get_nvme_devices_count() 1183 1184 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1185 port = str(4420 + bdev_num) 1186 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1187 serial = "SPDK00%s" % bdev_num 1188 bdev_name = "Nvme%sn1" % bdev_num 1189 1190 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1191 allow_any_host=True, max_namespaces=8) 1192 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1193 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1194 nqn=nqn, 1195 trtype=self.transport, 1196 traddr=ip, 1197 trsvcid=port, 1198 adrfam="ipv4") 1199 self.subsystem_info_list.append([port, nqn, ip]) 1200 self.subsys_no = len(self.subsystem_info_list) 1201 1202 self.log_print("SPDK NVMeOF subsystem configuration:") 1203 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1204 1205 def bpf_start(self): 1206 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1207 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1208 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1209 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1210 1211 with open(self.pid, "r") as fh: 1212 nvmf_pid = str(fh.readline()) 1213 1214 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1215 self.log_print(cmd) 1216 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1217 1218 def tgt_start(self): 1219 if self.null_block: 1220 self.subsys_no = 1 1221 else: 1222 self.subsys_no = get_nvme_devices_count() 1223 self.log_print("Starting SPDK NVMeOF Target process") 1224 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1225 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1226 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1227 1228 with open(self.pid, "w") as fh: 1229 fh.write(str(proc.pid)) 1230 self.nvmf_proc = proc 1231 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1232 self.log_print("Waiting for spdk to initialize...") 1233 while True: 1234 if os.path.exists("/var/tmp/spdk.sock"): 1235 break 1236 time.sleep(1) 1237 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1238 1239 if self.enable_zcopy: 1240 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1241 enable_zerocopy_send_server=True) 1242 self.log_print("Target socket options:") 1243 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1244 1245 if self.enable_adq: 1246 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1247 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1248 nvme_adminq_poll_period_us=100000, retry_count=4) 1249 1250 if self.enable_idxd: 1251 rpc.idxd.idxd_scan_accel_engine(self.client, config_kernel_mode=None) 1252 self.log_print("Target IDXD accel engine enabled") 1253 1254 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1255 rpc.framework_start_init(self.client) 1256 1257 if self.bpf_scripts: 1258 self.bpf_start() 1259 1260 self.spdk_tgt_configure() 1261 1262 def stop(self): 1263 if self.bpf_proc: 1264 self.log_print("Stopping BPF Trace script") 1265 self.bpf_proc.terminate() 1266 self.bpf_proc.wait() 1267 1268 if hasattr(self, "nvmf_proc"): 1269 try: 1270 self.nvmf_proc.terminate() 1271 self.nvmf_proc.wait() 1272 except Exception as e: 1273 self.log_print(e) 1274 self.nvmf_proc.kill() 1275 self.nvmf_proc.communicate() 1276 1277 1278class KernelInitiator(Initiator): 1279 def __init__(self, name, general_config, initiator_config): 1280 super().__init__(name, general_config, initiator_config) 1281 1282 # Defaults 1283 self.extra_params = "" 1284 self.ioengine = "libaio" 1285 1286 if "extra_params" in initiator_config: 1287 self.extra_params = initiator_config["extra_params"] 1288 1289 if "kernel_engine" in initiator_config: 1290 self.ioengine = initiator_config["kernel_engine"] 1291 if "io_uring" in self.ioengine: 1292 self.extra_params = "--nr-poll-queues=8" 1293 1294 def get_connected_nvme_list(self): 1295 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1296 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1297 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1298 return nvme_list 1299 1300 def kernel_init_connect(self): 1301 self.log_print("Below connection attempts may result in error messages, this is expected!") 1302 for subsystem in self.subsystem_info_list: 1303 self.log_print("Trying to connect %s %s %s" % subsystem) 1304 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1305 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1306 time.sleep(2) 1307 1308 if "io_uring" in self.ioengine: 1309 self.log_print("Setting block layer settings for io_uring.") 1310 1311 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1312 # apparently it's not possible for connected subsystems. 1313 # Results in "error: Invalid argument" 1314 block_sysfs_settings = { 1315 "iostats": "0", 1316 "rq_affinity": "0", 1317 "nomerges": "2" 1318 } 1319 1320 for disk in self.get_connected_nvme_list(): 1321 sysfs = os.path.join("/sys/block", disk, "queue") 1322 for k, v in block_sysfs_settings.items(): 1323 sysfs_opt_path = os.path.join(sysfs, k) 1324 try: 1325 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1326 except subprocess.CalledProcessError as e: 1327 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1328 finally: 1329 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1330 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1331 1332 def kernel_init_disconnect(self): 1333 for subsystem in self.subsystem_info_list: 1334 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1335 time.sleep(1) 1336 1337 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1338 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1339 1340 filename_section = "" 1341 nvme_per_split = int(len(nvme_list) / len(threads)) 1342 remainder = len(nvme_list) % len(threads) 1343 iterator = iter(nvme_list) 1344 result = [] 1345 for i in range(len(threads)): 1346 result.append([]) 1347 for _ in range(nvme_per_split): 1348 result[i].append(next(iterator)) 1349 if remainder: 1350 result[i].append(next(iterator)) 1351 remainder -= 1 1352 for i, r in enumerate(result): 1353 header = "[filename%s]" % i 1354 disks = "\n".join(["filename=%s" % x for x in r]) 1355 job_section_qd = round((io_depth * len(r)) / num_jobs) 1356 if job_section_qd == 0: 1357 job_section_qd = 1 1358 iodepth = "iodepth=%s" % job_section_qd 1359 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1360 1361 return filename_section 1362 1363 1364class SPDKInitiator(Initiator): 1365 def __init__(self, name, general_config, initiator_config): 1366 super().__init__(name, general_config, initiator_config) 1367 1368 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1369 self.install_spdk() 1370 1371 # Required fields 1372 self.num_cores = initiator_config["num_cores"] 1373 1374 # Optional fields 1375 self.enable_data_digest = False 1376 if "enable_data_digest" in initiator_config: 1377 self.enable_data_digest = initiator_config["enable_data_digest"] 1378 1379 def install_spdk(self): 1380 self.log_print("Using fio binary %s" % self.fio_bin) 1381 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1382 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1383 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1384 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1385 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1386 1387 self.log_print("SPDK built") 1388 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1389 1390 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1391 bdev_cfg_section = { 1392 "subsystems": [ 1393 { 1394 "subsystem": "bdev", 1395 "config": [] 1396 } 1397 ] 1398 } 1399 1400 for i, subsys in enumerate(remote_subsystem_list): 1401 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1402 nvme_ctrl = { 1403 "method": "bdev_nvme_attach_controller", 1404 "params": { 1405 "name": "Nvme{}".format(i), 1406 "trtype": self.transport, 1407 "traddr": sub_addr, 1408 "trsvcid": sub_port, 1409 "subnqn": sub_nqn, 1410 "adrfam": "IPv4" 1411 } 1412 } 1413 1414 if self.enable_adq: 1415 nvme_ctrl["params"].update({"priority": "1"}) 1416 1417 if self.enable_data_digest: 1418 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1419 1420 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1421 1422 return json.dumps(bdev_cfg_section, indent=2) 1423 1424 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1425 filename_section = "" 1426 if len(threads) >= len(subsystems): 1427 threads = range(0, len(subsystems)) 1428 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1429 nvme_per_split = int(len(subsystems) / len(threads)) 1430 remainder = len(subsystems) % len(threads) 1431 iterator = iter(filenames) 1432 result = [] 1433 for i in range(len(threads)): 1434 result.append([]) 1435 for _ in range(nvme_per_split): 1436 result[i].append(next(iterator)) 1437 if remainder: 1438 result[i].append(next(iterator)) 1439 remainder -= 1 1440 for i, r in enumerate(result): 1441 header = "[filename%s]" % i 1442 disks = "\n".join(["filename=%s" % x for x in r]) 1443 job_section_qd = round((io_depth * len(r)) / num_jobs) 1444 if job_section_qd == 0: 1445 job_section_qd = 1 1446 iodepth = "iodepth=%s" % job_section_qd 1447 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1448 1449 return filename_section 1450 1451 1452if __name__ == "__main__": 1453 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1454 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1455 1456 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1457 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1458 help='Configuration file.') 1459 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1460 help='Results directory.') 1461 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1462 help='CSV results filename.') 1463 1464 args = parser.parse_args() 1465 1466 print("Using config file: %s" % args.config) 1467 with open(args.config, "r") as config: 1468 data = json.load(config) 1469 1470 initiators = [] 1471 fio_cases = [] 1472 1473 general_config = data["general"] 1474 target_config = data["target"] 1475 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1476 1477 for k, v in data.items(): 1478 if "target" in k: 1479 v.update({"results_dir": args.results}) 1480 if data[k]["mode"] == "spdk": 1481 target_obj = SPDKTarget(k, data["general"], v) 1482 elif data[k]["mode"] == "kernel": 1483 target_obj = KernelTarget(k, data["general"], v) 1484 elif "initiator" in k: 1485 if data[k]["mode"] == "spdk": 1486 init_obj = SPDKInitiator(k, data["general"], v) 1487 elif data[k]["mode"] == "kernel": 1488 init_obj = KernelInitiator(k, data["general"], v) 1489 initiators.append(init_obj) 1490 elif "fio" in k: 1491 fio_workloads = itertools.product(data[k]["bs"], 1492 data[k]["qd"], 1493 data[k]["rw"]) 1494 1495 fio_run_time = data[k]["run_time"] 1496 fio_ramp_time = data[k]["ramp_time"] 1497 fio_rw_mix_read = data[k]["rwmixread"] 1498 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1499 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1500 1501 fio_rate_iops = 0 1502 if "rate_iops" in data[k]: 1503 fio_rate_iops = data[k]["rate_iops"] 1504 else: 1505 continue 1506 1507 try: 1508 os.mkdir(args.results) 1509 except FileExistsError: 1510 pass 1511 1512 for i in initiators: 1513 target_obj.initiator_info.append( 1514 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1515 ) 1516 1517 # TODO: This try block is definietly too large. Need to break this up into separate 1518 # logical blocks to reduce size. 1519 try: 1520 target_obj.tgt_start() 1521 1522 for i in initiators: 1523 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1524 if i.enable_adq: 1525 i.adq_configure_tc() 1526 1527 # Poor mans threading 1528 # Run FIO tests 1529 for block_size, io_depth, rw in fio_workloads: 1530 threads = [] 1531 configs = [] 1532 for i in initiators: 1533 if i.mode == "kernel": 1534 i.kernel_init_connect() 1535 1536 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1537 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1538 configs.append(cfg) 1539 1540 for i, cfg in zip(initiators, configs): 1541 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1542 threads.append(t) 1543 if target_obj.enable_sar: 1544 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1545 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1546 threads.append(t) 1547 1548 if target_obj.enable_pcm: 1549 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1550 1551 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1552 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1553 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1554 1555 threads.append(pcm_cpu_t) 1556 threads.append(pcm_mem_t) 1557 threads.append(pcm_pow_t) 1558 1559 if target_obj.enable_bandwidth: 1560 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1561 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1562 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1563 threads.append(t) 1564 1565 if target_obj.enable_dpdk_memory: 1566 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1567 threads.append(t) 1568 1569 if target_obj.enable_adq: 1570 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1571 threads.append(ethtool_thread) 1572 1573 for t in threads: 1574 t.start() 1575 for t in threads: 1576 t.join() 1577 1578 for i in initiators: 1579 if i.mode == "kernel": 1580 i.kernel_init_disconnect() 1581 i.copy_result_files(args.results) 1582 1583 target_obj.restore_governor() 1584 target_obj.restore_tuned() 1585 target_obj.restore_services() 1586 target_obj.restore_sysctl() 1587 if target_obj.enable_adq: 1588 target_obj.reload_driver("ice") 1589 for i in initiators: 1590 i.restore_governor() 1591 i.restore_tuned() 1592 i.restore_services() 1593 i.restore_sysctl() 1594 if i.enable_adq: 1595 i.reload_driver("ice") 1596 target_obj.parse_results(args.results, args.csv_filename) 1597 finally: 1598 for i in initiators: 1599 try: 1600 i.stop() 1601 except Exception as err: 1602 pass 1603 target_obj.stop() 1604