1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 # pylint: disable=R0201 97 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 98 return "" 99 100 def configure_system(self): 101 self.configure_services() 102 self.configure_sysctl() 103 self.configure_tuned() 104 self.configure_cpu_governor() 105 self.configure_irq_affinity() 106 107 def configure_adq(self): 108 if self.mode == "kernel": 109 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 110 return 111 self.adq_load_modules() 112 self.adq_configure_nic() 113 114 def adq_load_modules(self): 115 self.log_print("Modprobing ADQ-related Linux modules...") 116 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 117 for module in adq_module_deps: 118 try: 119 self.exec_cmd(["sudo", "modprobe", module]) 120 self.log_print("%s loaded!" % module) 121 except CalledProcessError as e: 122 self.log_print("ERROR: failed to load module %s" % module) 123 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 124 125 def adq_configure_tc(self): 126 self.log_print("Configuring ADQ Traffic classes and filters...") 127 128 if self.mode == "kernel": 129 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 130 return 131 132 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 133 num_queues_tc1 = self.num_cores 134 port_param = "dst_port" if isinstance(self, Target) else "src_port" 135 port = "4420" 136 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 137 138 for nic_ip in self.nic_ips: 139 nic_name = self.get_nic_name_by_ip(nic_ip) 140 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 141 "root", "mqprio", "num_tc", "2", "map", "0", "1", 142 "queues", "%s@0" % num_queues_tc0, 143 "%s@%s" % (num_queues_tc1, num_queues_tc0), 144 "hw", "1", "mode", "channel"] 145 self.log_print(" ".join(tc_qdisc_map_cmd)) 146 self.exec_cmd(tc_qdisc_map_cmd) 147 148 time.sleep(5) 149 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 150 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 151 self.exec_cmd(tc_qdisc_ingress_cmd) 152 153 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 154 "protocol", "ip", "ingress", "prio", "1", "flower", 155 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 156 "skip_sw", "hw_tc", "1"] 157 self.log_print(" ".join(tc_filter_cmd)) 158 self.exec_cmd(tc_filter_cmd) 159 160 # show tc configuration 161 self.log_print("Show tc configuration for %s NIC..." % nic_name) 162 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 163 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 164 self.log_print("%s" % tc_disk_out) 165 self.log_print("%s" % tc_filter_out) 166 167 # Ethtool coalesce settings must be applied after configuring traffic classes 168 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 169 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 170 171 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 172 xps_cmd = ["sudo", xps_script_path, nic_name] 173 self.log_print(xps_cmd) 174 self.exec_cmd(xps_cmd) 175 176 def reload_driver(self, driver): 177 try: 178 self.exec_cmd(["sudo", "rmmod", driver]) 179 self.exec_cmd(["sudo", "modprobe", driver]) 180 except CalledProcessError as e: 181 self.log_print("ERROR: failed to reload %s module!" % driver) 182 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 183 184 def adq_configure_nic(self): 185 self.log_print("Configuring NIC port settings for ADQ testing...") 186 187 # Reload the driver first, to make sure any previous settings are re-set. 188 self.reload_driver("ice") 189 190 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 191 for nic in nic_names: 192 self.log_print(nic) 193 try: 194 self.exec_cmd(["sudo", "ethtool", "-K", nic, 195 "hw-tc-offload", "on"]) # Enable hardware TC offload 196 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 197 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 198 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 199 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 200 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 201 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 202 "channel-pkt-inspect-optimize", "on"]) 203 except CalledProcessError as e: 204 self.log_print("ERROR: failed to configure NIC port using ethtool!") 205 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 206 self.log_print("Please update your NIC driver and firmware versions and try again.") 207 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 208 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 209 210 def configure_services(self): 211 self.log_print("Configuring active services...") 212 svc_config = configparser.ConfigParser(strict=False) 213 214 # Below list is valid only for RHEL / Fedora systems and might not 215 # contain valid names for other distributions. 216 svc_target_state = { 217 "firewalld": "inactive", 218 "irqbalance": "inactive", 219 "lldpad.service": "inactive", 220 "lldpad.socket": "inactive" 221 } 222 223 for service in svc_target_state: 224 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 225 out = "\n".join(["[%s]" % service, out]) 226 svc_config.read_string(out) 227 228 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 229 continue 230 231 service_state = svc_config[service]["ActiveState"] 232 self.log_print("Current state of %s service is %s" % (service, service_state)) 233 self.svc_restore_dict.update({service: service_state}) 234 if service_state != "inactive": 235 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 236 self.exec_cmd(["sudo", "systemctl", "stop", service]) 237 238 def configure_sysctl(self): 239 self.log_print("Tuning sysctl settings...") 240 241 busy_read = 0 242 if self.enable_adq and self.mode == "spdk": 243 busy_read = 1 244 245 sysctl_opts = { 246 "net.core.busy_poll": 0, 247 "net.core.busy_read": busy_read, 248 "net.core.somaxconn": 4096, 249 "net.core.netdev_max_backlog": 8192, 250 "net.ipv4.tcp_max_syn_backlog": 16384, 251 "net.core.rmem_max": 268435456, 252 "net.core.wmem_max": 268435456, 253 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 254 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 255 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 256 "net.ipv4.route.flush": 1, 257 "vm.overcommit_memory": 1, 258 } 259 260 for opt, value in sysctl_opts.items(): 261 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 262 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 263 264 def configure_tuned(self): 265 if not self.tuned_profile: 266 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 267 return 268 269 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 270 service = "tuned" 271 tuned_config = configparser.ConfigParser(strict=False) 272 273 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 274 out = "\n".join(["[%s]" % service, out]) 275 tuned_config.read_string(out) 276 tuned_state = tuned_config[service]["ActiveState"] 277 self.svc_restore_dict.update({service: tuned_state}) 278 279 if tuned_state != "inactive": 280 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 281 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 282 283 self.tuned_restore_dict = { 284 "profile": profile, 285 "mode": profile_mode 286 } 287 288 self.exec_cmd(["sudo", "systemctl", "start", service]) 289 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 290 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 291 292 def configure_cpu_governor(self): 293 self.log_print("Setting CPU governor to performance...") 294 295 # This assumes that there is the same CPU scaling governor on each CPU 296 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 297 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 298 299 def configure_irq_affinity(self): 300 self.log_print("Setting NIC irq affinity for NICs...") 301 302 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 303 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 304 for nic in nic_names: 305 irq_cmd = ["sudo", irq_script_path, nic] 306 self.log_print(irq_cmd) 307 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 308 309 def restore_services(self): 310 self.log_print("Restoring services...") 311 for service, state in self.svc_restore_dict.items(): 312 cmd = "stop" if state == "inactive" else "start" 313 self.exec_cmd(["sudo", "systemctl", cmd, service]) 314 315 def restore_sysctl(self): 316 self.log_print("Restoring sysctl settings...") 317 for opt, value in self.sysctl_restore_dict.items(): 318 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 319 320 def restore_tuned(self): 321 self.log_print("Restoring tuned-adm settings...") 322 323 if not self.tuned_restore_dict: 324 return 325 326 if self.tuned_restore_dict["mode"] == "auto": 327 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 328 self.log_print("Reverted tuned-adm to auto_profile.") 329 else: 330 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 331 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 332 333 def restore_governor(self): 334 self.log_print("Restoring CPU governor setting...") 335 if self.governor_restore: 336 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 337 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 338 339 340class Target(Server): 341 def __init__(self, name, general_config, target_config): 342 super().__init__(name, general_config, target_config) 343 344 # Defaults 345 self.enable_sar = False 346 self.sar_delay = 0 347 self.sar_interval = 0 348 self.sar_count = 0 349 self.enable_pcm = False 350 self.pcm_dir = "" 351 self.pcm_delay = 0 352 self.pcm_interval = 0 353 self.pcm_count = 0 354 self.enable_bandwidth = 0 355 self.bandwidth_count = 0 356 self.enable_dpdk_memory = False 357 self.dpdk_wait_time = 0 358 self.enable_zcopy = False 359 self.scheduler_name = "static" 360 self.null_block = 0 361 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 362 self.subsystem_info_list = [] 363 self.initiator_info = [] 364 365 if "null_block_devices" in target_config: 366 self.null_block = target_config["null_block_devices"] 367 if "sar_settings" in target_config: 368 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 369 if "pcm_settings" in target_config: 370 self.enable_pcm = True 371 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 372 if "enable_bandwidth" in target_config: 373 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 374 if "enable_dpdk_memory" in target_config: 375 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 376 if "scheduler_settings" in target_config: 377 self.scheduler_name = target_config["scheduler_settings"] 378 if "zcopy_settings" in target_config: 379 self.enable_zcopy = target_config["zcopy_settings"] 380 if "results_dir" in target_config: 381 self.results_dir = target_config["results_dir"] 382 383 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 384 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 385 self.set_local_nic_info(self.set_local_nic_info_helper()) 386 387 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 388 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 389 390 self.configure_system() 391 if self.enable_adq: 392 self.configure_adq() 393 self.sys_config() 394 395 def set_local_nic_info_helper(self): 396 return json.loads(self.exec_cmd(["lshw", "-json"])) 397 398 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 399 stderr_opt = None 400 if stderr_redirect: 401 stderr_opt = subprocess.STDOUT 402 if change_dir: 403 old_cwd = os.getcwd() 404 os.chdir(change_dir) 405 self.log_print("Changing directory to %s" % change_dir) 406 407 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 408 409 if change_dir: 410 os.chdir(old_cwd) 411 self.log_print("Changing directory to %s" % old_cwd) 412 return out 413 414 def zip_spdk_sources(self, spdk_dir, dest_file): 415 self.log_print("Zipping SPDK source directory") 416 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 417 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 418 for file in files: 419 fh.write(os.path.relpath(os.path.join(root, file))) 420 fh.close() 421 self.log_print("Done zipping") 422 423 @staticmethod 424 def _chunks(input_list, chunks_no): 425 div, rem = divmod(len(input_list), chunks_no) 426 for i in range(chunks_no): 427 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 428 yield input_list[si:si + (div + 1 if i < rem else div)] 429 430 def spread_bdevs(self, req_disks): 431 # Spread available block devices indexes: 432 # - evenly across available initiator systems 433 # - evenly across available NIC interfaces for 434 # each initiator 435 # Not NUMA aware. 436 ip_bdev_map = [] 437 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 438 439 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 440 self.initiator_info[i]["bdev_range"] = init_chunk 441 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 442 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 443 for c in nic_chunk: 444 ip_bdev_map.append((ip, c)) 445 return ip_bdev_map 446 447 @staticmethod 448 def read_json_stats(file): 449 with open(file, "r") as json_data: 450 data = json.load(json_data) 451 job_pos = 0 # job_post = 0 because using aggregated results 452 453 # Check if latency is in nano or microseconds to choose correct dict key 454 def get_lat_unit(key_prefix, dict_section): 455 # key prefix - lat, clat or slat. 456 # dict section - portion of json containing latency bucket in question 457 # Return dict key to access the bucket and unit as string 458 for k, _ in dict_section.items(): 459 if k.startswith(key_prefix): 460 return k, k.split("_")[1] 461 462 def get_clat_percentiles(clat_dict_leaf): 463 if "percentile" in clat_dict_leaf: 464 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 465 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 466 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 467 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 468 469 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 470 else: 471 # Latest fio versions do not provide "percentile" results if no 472 # measurements were done, so just return zeroes 473 return [0, 0, 0, 0] 474 475 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 476 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 477 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 478 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 479 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 480 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 481 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 482 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 483 data["jobs"][job_pos]["read"][clat_key]) 484 485 if "ns" in lat_unit: 486 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 487 if "ns" in clat_unit: 488 read_p99_lat = read_p99_lat / 1000 489 read_p99_9_lat = read_p99_9_lat / 1000 490 read_p99_99_lat = read_p99_99_lat / 1000 491 read_p99_999_lat = read_p99_999_lat / 1000 492 493 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 494 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 495 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 496 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 497 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 498 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 499 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 500 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 501 data["jobs"][job_pos]["write"][clat_key]) 502 503 if "ns" in lat_unit: 504 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 505 if "ns" in clat_unit: 506 write_p99_lat = write_p99_lat / 1000 507 write_p99_9_lat = write_p99_9_lat / 1000 508 write_p99_99_lat = write_p99_99_lat / 1000 509 write_p99_999_lat = write_p99_999_lat / 1000 510 511 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 512 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 513 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 514 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 515 516 def parse_results(self, results_dir, csv_file): 517 files = os.listdir(results_dir) 518 fio_files = filter(lambda x: ".fio" in x, files) 519 json_files = [x for x in files if ".json" in x] 520 521 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 522 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 523 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 524 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 525 526 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 527 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 528 529 header_line = ",".join(["Name", *headers]) 530 aggr_header_line = ",".join(["Name", *aggr_headers]) 531 532 # Create empty results file 533 with open(os.path.join(results_dir, csv_file), "w") as fh: 534 fh.write(aggr_header_line + "\n") 535 rows = set() 536 537 for fio_config in fio_files: 538 self.log_print("Getting FIO stats for %s" % fio_config) 539 job_name, _ = os.path.splitext(fio_config) 540 541 # Look in the filename for rwmixread value. Function arguments do 542 # not have that information. 543 # TODO: Improve this function by directly using workload params instead 544 # of regexing through filenames. 545 if "read" in job_name: 546 rw_mixread = 1 547 elif "write" in job_name: 548 rw_mixread = 0 549 else: 550 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 551 552 # If "_CPU" exists in name - ignore it 553 # Initiators for the same job could have different num_cores parameter 554 job_name = re.sub(r"_\d+CPU", "", job_name) 555 job_result_files = [x for x in json_files if x.startswith(job_name)] 556 self.log_print("Matching result files for current fio config:") 557 for j in job_result_files: 558 self.log_print("\t %s" % j) 559 560 # There may have been more than 1 initiator used in test, need to check that 561 # Result files are created so that string after last "_" separator is server name 562 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 563 inits_avg_results = [] 564 for i in inits_names: 565 self.log_print("\tGetting stats for initiator %s" % i) 566 # There may have been more than 1 test run for this job, calculate average results for initiator 567 i_results = [x for x in job_result_files if i in x] 568 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 569 570 separate_stats = [] 571 for r in i_results: 572 try: 573 stats = self.read_json_stats(os.path.join(results_dir, r)) 574 separate_stats.append(stats) 575 self.log_print(stats) 576 except JSONDecodeError: 577 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 578 579 init_results = [sum(x) for x in zip(*separate_stats)] 580 init_results = [x / len(separate_stats) for x in init_results] 581 inits_avg_results.append(init_results) 582 583 self.log_print("\tAverage results for initiator %s" % i) 584 self.log_print(init_results) 585 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 586 fh.write(header_line + "\n") 587 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 588 589 # Sum results of all initiators running this FIO job. 590 # Latency results are an average of latencies from accros all initiators. 591 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 592 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 593 for key in inits_avg_results: 594 if "lat" in key: 595 inits_avg_results[key] /= len(inits_names) 596 597 # Aggregate separate read/write values into common labels 598 # Take rw_mixread into consideration for mixed read/write workloads. 599 aggregate_results = OrderedDict() 600 for h in aggr_headers: 601 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 602 if "lat" in h: 603 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 604 else: 605 _ = read_stat + write_stat 606 aggregate_results[h] = "{0:.3f}".format(_) 607 608 rows.add(",".join([job_name, *aggregate_results.values()])) 609 610 # Save results to file 611 for row in rows: 612 with open(os.path.join(results_dir, csv_file), "a") as fh: 613 fh.write(row + "\n") 614 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 615 616 def measure_sar(self, results_dir, sar_file_prefix): 617 cpu_number = os.cpu_count() 618 sar_idle_sum = 0 619 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 620 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 621 622 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 623 time.sleep(self.sar_delay) 624 self.log_print("Starting SAR measurements") 625 626 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 627 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 628 for line in out.split("\n"): 629 if "Average" in line: 630 if "CPU" in line: 631 self.log_print("Summary CPU utilization from SAR:") 632 self.log_print(line) 633 elif "all" in line: 634 self.log_print(line) 635 else: 636 sar_idle_sum += float(line.split()[7]) 637 fh.write(out) 638 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 639 640 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 641 f.write("%0.2f" % sar_cpu_usage) 642 643 def ethtool_after_fio_ramp(self, fio_ramp_time): 644 time.sleep(fio_ramp_time//2) 645 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 646 for nic in nic_names: 647 self.log_print(nic) 648 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 649 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 650 651 def measure_pcm_memory(self, results_dir, pcm_file_name): 652 time.sleep(self.pcm_delay) 653 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 654 pcm_memory = subprocess.Popen(cmd) 655 time.sleep(self.pcm_count) 656 pcm_memory.terminate() 657 658 def measure_pcm(self, results_dir, pcm_file_name): 659 time.sleep(self.pcm_delay) 660 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 661 subprocess.run(cmd) 662 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 663 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 664 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 665 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 666 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 667 668 def measure_pcm_power(self, results_dir, pcm_power_file_name): 669 time.sleep(self.pcm_delay) 670 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 671 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 672 fh.write(out) 673 674 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 675 self.log_print("INFO: starting network bandwidth measure") 676 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 677 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 678 679 def measure_dpdk_memory(self, results_dir): 680 self.log_print("INFO: waiting to generate DPDK memory usage") 681 time.sleep(self.dpdk_wait_time) 682 self.log_print("INFO: generating DPDK memory usage") 683 rpc.env.env_dpdk_get_mem_stats 684 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 685 686 def sys_config(self): 687 self.log_print("====Kernel release:====") 688 self.log_print(os.uname().release) 689 self.log_print("====Kernel command line:====") 690 with open('/proc/cmdline') as f: 691 cmdline = f.readlines() 692 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 693 self.log_print("====sysctl conf:====") 694 with open('/etc/sysctl.conf') as f: 695 sysctl = f.readlines() 696 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 697 self.log_print("====Cpu power info:====") 698 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 699 self.log_print("====zcopy settings:====") 700 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 701 self.log_print("====Scheduler settings:====") 702 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 703 704 705class Initiator(Server): 706 def __init__(self, name, general_config, initiator_config): 707 super().__init__(name, general_config, initiator_config) 708 709 # Required fields 710 self.ip = initiator_config["ip"] 711 self.target_nic_ips = initiator_config["target_nic_ips"] 712 713 # Defaults 714 self.cpus_allowed = None 715 self.cpus_allowed_policy = "shared" 716 self.spdk_dir = "/tmp/spdk" 717 self.fio_bin = "/usr/src/fio/fio" 718 self.nvmecli_bin = "nvme" 719 self.cpu_frequency = None 720 self.subsystem_info_list = [] 721 722 if "spdk_dir" in initiator_config: 723 self.spdk_dir = initiator_config["spdk_dir"] 724 if "fio_bin" in initiator_config: 725 self.fio_bin = initiator_config["fio_bin"] 726 if "nvmecli_bin" in initiator_config: 727 self.nvmecli_bin = initiator_config["nvmecli_bin"] 728 if "cpus_allowed" in initiator_config: 729 self.cpus_allowed = initiator_config["cpus_allowed"] 730 if "cpus_allowed_policy" in initiator_config: 731 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 732 if "cpu_frequency" in initiator_config: 733 self.cpu_frequency = initiator_config["cpu_frequency"] 734 if os.getenv('SPDK_WORKSPACE'): 735 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 736 737 self.ssh_connection = paramiko.SSHClient() 738 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 739 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 740 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 741 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 742 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 743 744 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 745 self.copy_spdk("/tmp/spdk.zip") 746 self.set_local_nic_info(self.set_local_nic_info_helper()) 747 self.set_cpu_frequency() 748 self.configure_system() 749 if self.enable_adq: 750 self.configure_adq() 751 self.sys_config() 752 753 def set_local_nic_info_helper(self): 754 return json.loads(self.exec_cmd(["lshw", "-json"])) 755 756 def stop(self): 757 self.ssh_connection.close() 758 759 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 760 if change_dir: 761 cmd = ["cd", change_dir, ";", *cmd] 762 763 # In case one of the command elements contains whitespace and is not 764 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 765 # when sending to remote system. 766 for i, c in enumerate(cmd): 767 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 768 cmd[i] = '"%s"' % c 769 cmd = " ".join(cmd) 770 771 # Redirect stderr to stdout thanks using get_pty option if needed 772 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 773 out = stdout.read().decode(encoding="utf-8") 774 775 # Check the return code 776 rc = stdout.channel.recv_exit_status() 777 if rc: 778 raise CalledProcessError(int(rc), cmd, out) 779 780 return out 781 782 def put_file(self, local, remote_dest): 783 ftp = self.ssh_connection.open_sftp() 784 ftp.put(local, remote_dest) 785 ftp.close() 786 787 def get_file(self, remote, local_dest): 788 ftp = self.ssh_connection.open_sftp() 789 ftp.get(remote, local_dest) 790 ftp.close() 791 792 def copy_spdk(self, local_spdk_zip): 793 self.log_print("Copying SPDK sources to initiator %s" % self.name) 794 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 795 self.log_print("Copied sources zip from target") 796 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 797 self.log_print("Sources unpacked") 798 799 def copy_result_files(self, dest_dir): 800 self.log_print("Copying results") 801 802 if not os.path.exists(dest_dir): 803 os.mkdir(dest_dir) 804 805 # Get list of result files from initiator and copy them back to target 806 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 807 808 for file in file_list: 809 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 810 os.path.join(dest_dir, file)) 811 self.log_print("Done copying results") 812 813 def discover_subsystems(self, address_list, subsys_no): 814 num_nvmes = range(0, subsys_no) 815 nvme_discover_output = "" 816 for ip, subsys_no in itertools.product(address_list, num_nvmes): 817 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 818 nvme_discover_cmd = ["sudo", 819 "%s" % self.nvmecli_bin, 820 "discover", "-t", "%s" % self.transport, 821 "-s", "%s" % (4420 + subsys_no), 822 "-a", "%s" % ip] 823 824 try: 825 stdout = self.exec_cmd(nvme_discover_cmd) 826 if stdout: 827 nvme_discover_output = nvme_discover_output + stdout 828 except CalledProcessError: 829 # Do nothing. In case of discovering remote subsystems of kernel target 830 # we expect "nvme discover" to fail a bunch of times because we basically 831 # scan ports. 832 pass 833 834 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 835 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 836 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 837 nvme_discover_output) # from nvme discovery output 838 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 839 subsystems = filter(lambda x: "discovery" not in x[1], subsystems) 840 subsystems = list(set(subsystems)) 841 subsystems.sort(key=lambda x: x[1]) 842 self.log_print("Found matching subsystems on target side:") 843 for s in subsystems: 844 self.log_print(s) 845 self.subsystem_info_list = subsystems 846 847 def gen_fio_filename_conf(self, *args, **kwargs): 848 # Logic implemented in SPDKInitiator and KernelInitiator classes 849 pass 850 851 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 852 fio_conf_template = """ 853[global] 854ioengine={ioengine} 855{spdk_conf} 856thread=1 857group_reporting=1 858direct=1 859percentile_list=50:90:99:99.5:99.9:99.99:99.999 860 861norandommap=1 862rw={rw} 863rwmixread={rwmixread} 864bs={block_size} 865time_based=1 866ramp_time={ramp_time} 867runtime={run_time} 868rate_iops={rate_iops} 869""" 870 if "spdk" in self.mode: 871 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 872 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 873 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 874 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 875 else: 876 ioengine = self.ioengine 877 spdk_conf = "" 878 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 879 "|", "awk", "'{print $1}'"]) 880 subsystems = [x for x in out.split("\n") if "nvme" in x] 881 882 if self.cpus_allowed is not None: 883 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 884 cpus_num = 0 885 cpus = self.cpus_allowed.split(",") 886 for cpu in cpus: 887 if "-" in cpu: 888 a, b = cpu.split("-") 889 a = int(a) 890 b = int(b) 891 cpus_num += len(range(a, b)) 892 else: 893 cpus_num += 1 894 self.num_cores = cpus_num 895 threads = range(0, self.num_cores) 896 elif hasattr(self, 'num_cores'): 897 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 898 threads = range(0, int(self.num_cores)) 899 else: 900 self.num_cores = len(subsystems) 901 threads = range(0, len(subsystems)) 902 903 if "spdk" in self.mode: 904 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 905 else: 906 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 907 908 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 909 rw=rw, rwmixread=rwmixread, block_size=block_size, 910 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 911 912 # TODO: hipri disabled for now, as it causes fio errors: 913 # io_u error on file /dev/nvme2n1: Operation not supported 914 # See comment in KernelInitiator class, kernel_init_connect() function 915 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 916 fio_config = fio_config + """ 917fixedbufs=1 918registerfiles=1 919#hipri=1 920""" 921 if num_jobs: 922 fio_config = fio_config + "numjobs=%s \n" % num_jobs 923 if self.cpus_allowed is not None: 924 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 925 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 926 fio_config = fio_config + filename_section 927 928 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 929 if hasattr(self, "num_cores"): 930 fio_config_filename += "_%sCPU" % self.num_cores 931 fio_config_filename += ".fio" 932 933 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 934 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 935 self.log_print("Created FIO Config:") 936 self.log_print(fio_config) 937 938 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 939 940 def set_cpu_frequency(self): 941 if self.cpu_frequency is not None: 942 try: 943 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 944 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 945 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 946 except Exception: 947 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 948 sys.exit() 949 else: 950 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 951 952 def run_fio(self, fio_config_file, run_num=None): 953 job_name, _ = os.path.splitext(fio_config_file) 954 self.log_print("Starting FIO run for job: %s" % job_name) 955 self.log_print("Using FIO: %s" % self.fio_bin) 956 957 if run_num: 958 for i in range(1, run_num + 1): 959 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 960 try: 961 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 962 "--output=%s" % output_filename, "--eta=never"], True) 963 self.log_print(output) 964 except subprocess.CalledProcessError as e: 965 self.log_print("ERROR: Fio process failed!") 966 self.log_print(e.stdout) 967 else: 968 output_filename = job_name + "_" + self.name + ".json" 969 output = self.exec_cmd(["sudo", self.fio_bin, 970 fio_config_file, "--output-format=json", 971 "--output" % output_filename], True) 972 self.log_print(output) 973 self.log_print("FIO run finished. Results in: %s" % output_filename) 974 975 def sys_config(self): 976 self.log_print("====Kernel release:====") 977 self.log_print(self.exec_cmd(["uname", "-r"])) 978 self.log_print("====Kernel command line:====") 979 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 980 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 981 self.log_print("====sysctl conf:====") 982 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 983 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 984 self.log_print("====Cpu power info:====") 985 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 986 987 988class KernelTarget(Target): 989 def __init__(self, name, general_config, target_config): 990 super().__init__(name, general_config, target_config) 991 # Defaults 992 self.nvmet_bin = "nvmetcli" 993 994 if "nvmet_bin" in target_config: 995 self.nvmet_bin = target_config["nvmet_bin"] 996 997 def stop(self): 998 nvmet_command(self.nvmet_bin, "clear") 999 1000 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1001 1002 nvmet_cfg = { 1003 "ports": [], 1004 "hosts": [], 1005 "subsystems": [], 1006 } 1007 1008 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1009 port = str(4420 + bdev_num) 1010 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1011 serial = "SPDK00%s" % bdev_num 1012 bdev_name = nvme_list[bdev_num] 1013 1014 nvmet_cfg["subsystems"].append({ 1015 "allowed_hosts": [], 1016 "attr": { 1017 "allow_any_host": "1", 1018 "serial": serial, 1019 "version": "1.3" 1020 }, 1021 "namespaces": [ 1022 { 1023 "device": { 1024 "path": bdev_name, 1025 "uuid": "%s" % uuid.uuid4() 1026 }, 1027 "enable": 1, 1028 "nsid": port 1029 } 1030 ], 1031 "nqn": nqn 1032 }) 1033 1034 nvmet_cfg["ports"].append({ 1035 "addr": { 1036 "adrfam": "ipv4", 1037 "traddr": ip, 1038 "trsvcid": port, 1039 "trtype": self.transport 1040 }, 1041 "portid": bdev_num, 1042 "referrals": [], 1043 "subsystems": [nqn] 1044 }) 1045 1046 self.subsystem_info_list.append([port, nqn, ip]) 1047 self.subsys_no = len(self.subsystem_info_list) 1048 1049 with open("kernel.conf", "w") as fh: 1050 fh.write(json.dumps(nvmet_cfg, indent=2)) 1051 1052 def tgt_start(self): 1053 self.log_print("Configuring kernel NVMeOF Target") 1054 1055 if self.null_block: 1056 print("Configuring with null block device.") 1057 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1058 else: 1059 print("Configuring with NVMe drives.") 1060 nvme_list = get_nvme_devices() 1061 1062 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1063 self.subsys_no = len(nvme_list) 1064 1065 nvmet_command(self.nvmet_bin, "clear") 1066 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1067 1068 if self.enable_adq: 1069 self.adq_configure_tc() 1070 1071 self.log_print("Done configuring kernel NVMeOF Target") 1072 1073 1074class SPDKTarget(Target): 1075 def __init__(self, name, general_config, target_config): 1076 super().__init__(name, general_config, target_config) 1077 1078 # Required fields 1079 self.core_mask = target_config["core_mask"] 1080 self.num_cores = self.get_num_cores(self.core_mask) 1081 1082 # Defaults 1083 self.dif_insert_strip = False 1084 self.null_block_dif_type = 0 1085 self.num_shared_buffers = 4096 1086 self.max_queue_depth = 128 1087 self.bpf_proc = None 1088 self.bpf_scripts = [] 1089 self.enable_dsa = False 1090 1091 if "num_shared_buffers" in target_config: 1092 self.num_shared_buffers = target_config["num_shared_buffers"] 1093 if "max_queue_depth" in target_config: 1094 self.max_queue_depth = target_config["max_queue_depth"] 1095 if "null_block_dif_type" in target_config: 1096 self.null_block_dif_type = target_config["null_block_dif_type"] 1097 if "dif_insert_strip" in target_config: 1098 self.dif_insert_strip = target_config["dif_insert_strip"] 1099 if "bpf_scripts" in target_config: 1100 self.bpf_scripts = target_config["bpf_scripts"] 1101 if "dsa_settings" in target_config: 1102 self.enable_dsa = target_config["dsa_settings"] 1103 1104 self.log_print("====DSA settings:====") 1105 self.log_print("DSA enabled: %s" % (self.enable_dsa)) 1106 1107 @staticmethod 1108 def get_num_cores(core_mask): 1109 if "0x" in core_mask: 1110 return bin(int(core_mask, 16)).count("1") 1111 else: 1112 num_cores = 0 1113 core_mask = core_mask.replace("[", "") 1114 core_mask = core_mask.replace("]", "") 1115 for i in core_mask.split(","): 1116 if "-" in i: 1117 x, y = i.split("-") 1118 num_cores += len(range(int(x), int(y))) + 1 1119 else: 1120 num_cores += 1 1121 return num_cores 1122 1123 def spdk_tgt_configure(self): 1124 self.log_print("Configuring SPDK NVMeOF target via RPC") 1125 1126 if self.enable_adq: 1127 self.adq_configure_tc() 1128 1129 # Create transport layer 1130 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1131 num_shared_buffers=self.num_shared_buffers, 1132 max_queue_depth=self.max_queue_depth, 1133 dif_insert_or_strip=self.dif_insert_strip, 1134 sock_priority=self.adq_priority) 1135 self.log_print("SPDK NVMeOF transport layer:") 1136 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1137 1138 if self.null_block: 1139 self.spdk_tgt_add_nullblock(self.null_block) 1140 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1141 else: 1142 self.spdk_tgt_add_nvme_conf() 1143 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1144 1145 self.log_print("Done configuring SPDK NVMeOF Target") 1146 1147 def spdk_tgt_add_nullblock(self, null_block_count): 1148 md_size = 0 1149 block_size = 4096 1150 if self.null_block_dif_type != 0: 1151 md_size = 128 1152 1153 self.log_print("Adding null block bdevices to config via RPC") 1154 for i in range(null_block_count): 1155 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1156 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1157 dif_type=self.null_block_dif_type, md_size=md_size) 1158 self.log_print("SPDK Bdevs configuration:") 1159 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1160 1161 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1162 self.log_print("Adding NVMe bdevs to config via RPC") 1163 1164 bdfs = get_nvme_devices_bdf() 1165 bdfs = [b.replace(":", ".") for b in bdfs] 1166 1167 if req_num_disks: 1168 if req_num_disks > len(bdfs): 1169 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1170 sys.exit(1) 1171 else: 1172 bdfs = bdfs[0:req_num_disks] 1173 1174 for i, bdf in enumerate(bdfs): 1175 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1176 1177 self.log_print("SPDK Bdevs configuration:") 1178 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1179 1180 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1181 self.log_print("Adding subsystems to config") 1182 if not req_num_disks: 1183 req_num_disks = get_nvme_devices_count() 1184 1185 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1186 port = str(4420 + bdev_num) 1187 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1188 serial = "SPDK00%s" % bdev_num 1189 bdev_name = "Nvme%sn1" % bdev_num 1190 1191 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1192 allow_any_host=True, max_namespaces=8) 1193 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1194 for nqn_name in [nqn, "discovery"]: 1195 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1196 nqn=nqn_name, 1197 trtype=self.transport, 1198 traddr=ip, 1199 trsvcid=port, 1200 adrfam="ipv4") 1201 self.subsystem_info_list.append([port, nqn, ip]) 1202 self.subsys_no = len(self.subsystem_info_list) 1203 1204 self.log_print("SPDK NVMeOF subsystem configuration:") 1205 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1206 1207 def bpf_start(self): 1208 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1209 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1210 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1211 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1212 1213 with open(self.pid, "r") as fh: 1214 nvmf_pid = str(fh.readline()) 1215 1216 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1217 self.log_print(cmd) 1218 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1219 1220 def tgt_start(self): 1221 if self.null_block: 1222 self.subsys_no = 1 1223 else: 1224 self.subsys_no = get_nvme_devices_count() 1225 self.log_print("Starting SPDK NVMeOF Target process") 1226 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1227 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1228 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1229 1230 with open(self.pid, "w") as fh: 1231 fh.write(str(proc.pid)) 1232 self.nvmf_proc = proc 1233 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1234 self.log_print("Waiting for spdk to initialize...") 1235 while True: 1236 if os.path.exists("/var/tmp/spdk.sock"): 1237 break 1238 time.sleep(1) 1239 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1240 1241 if self.enable_zcopy: 1242 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1243 enable_zerocopy_send_server=True) 1244 self.log_print("Target socket options:") 1245 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1246 1247 if self.enable_adq: 1248 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1249 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1250 nvme_adminq_poll_period_us=100000, retry_count=4) 1251 1252 if self.enable_dsa: 1253 rpc.dsa.dsa_scan_accel_engine(self.client, config_kernel_mode=None) 1254 self.log_print("Target DSA accel engine enabled") 1255 1256 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name) 1257 rpc.framework_start_init(self.client) 1258 1259 if self.bpf_scripts: 1260 self.bpf_start() 1261 1262 self.spdk_tgt_configure() 1263 1264 def stop(self): 1265 if self.bpf_proc: 1266 self.log_print("Stopping BPF Trace script") 1267 self.bpf_proc.terminate() 1268 self.bpf_proc.wait() 1269 1270 if hasattr(self, "nvmf_proc"): 1271 try: 1272 self.nvmf_proc.terminate() 1273 self.nvmf_proc.wait() 1274 except Exception as e: 1275 self.log_print(e) 1276 self.nvmf_proc.kill() 1277 self.nvmf_proc.communicate() 1278 1279 1280class KernelInitiator(Initiator): 1281 def __init__(self, name, general_config, initiator_config): 1282 super().__init__(name, general_config, initiator_config) 1283 1284 # Defaults 1285 self.extra_params = "" 1286 self.ioengine = "libaio" 1287 1288 if "extra_params" in initiator_config: 1289 self.extra_params = initiator_config["extra_params"] 1290 1291 if "kernel_engine" in initiator_config: 1292 self.ioengine = initiator_config["kernel_engine"] 1293 if "io_uring" in self.ioengine: 1294 self.extra_params = "--nr-poll-queues=8" 1295 1296 def get_connected_nvme_list(self): 1297 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1298 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1299 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1300 return nvme_list 1301 1302 def kernel_init_connect(self): 1303 self.log_print("Below connection attempts may result in error messages, this is expected!") 1304 for subsystem in self.subsystem_info_list: 1305 self.log_print("Trying to connect %s %s %s" % subsystem) 1306 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1307 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1308 time.sleep(2) 1309 1310 if "io_uring" in self.ioengine: 1311 self.log_print("Setting block layer settings for io_uring.") 1312 1313 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1314 # apparently it's not possible for connected subsystems. 1315 # Results in "error: Invalid argument" 1316 block_sysfs_settings = { 1317 "iostats": "0", 1318 "rq_affinity": "0", 1319 "nomerges": "2" 1320 } 1321 1322 for disk in self.get_connected_nvme_list(): 1323 sysfs = os.path.join("/sys/block", disk, "queue") 1324 for k, v in block_sysfs_settings.items(): 1325 sysfs_opt_path = os.path.join(sysfs, k) 1326 try: 1327 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1328 except subprocess.CalledProcessError as e: 1329 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1330 finally: 1331 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1332 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1333 1334 def kernel_init_disconnect(self): 1335 for subsystem in self.subsystem_info_list: 1336 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1337 time.sleep(1) 1338 1339 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1340 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1341 1342 filename_section = "" 1343 nvme_per_split = int(len(nvme_list) / len(threads)) 1344 remainder = len(nvme_list) % len(threads) 1345 iterator = iter(nvme_list) 1346 result = [] 1347 for i in range(len(threads)): 1348 result.append([]) 1349 for _ in range(nvme_per_split): 1350 result[i].append(next(iterator)) 1351 if remainder: 1352 result[i].append(next(iterator)) 1353 remainder -= 1 1354 for i, r in enumerate(result): 1355 header = "[filename%s]" % i 1356 disks = "\n".join(["filename=%s" % x for x in r]) 1357 job_section_qd = round((io_depth * len(r)) / num_jobs) 1358 if job_section_qd == 0: 1359 job_section_qd = 1 1360 iodepth = "iodepth=%s" % job_section_qd 1361 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1362 1363 return filename_section 1364 1365 1366class SPDKInitiator(Initiator): 1367 def __init__(self, name, general_config, initiator_config): 1368 super().__init__(name, general_config, initiator_config) 1369 1370 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1371 self.install_spdk() 1372 1373 # Required fields 1374 self.num_cores = initiator_config["num_cores"] 1375 1376 # Optional fields 1377 self.enable_data_digest = False 1378 if "enable_data_digest" in initiator_config: 1379 self.enable_data_digest = initiator_config["enable_data_digest"] 1380 1381 def install_spdk(self): 1382 self.log_print("Using fio binary %s" % self.fio_bin) 1383 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1384 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1385 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1386 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1387 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1388 1389 self.log_print("SPDK built") 1390 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1391 1392 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1393 bdev_cfg_section = { 1394 "subsystems": [ 1395 { 1396 "subsystem": "bdev", 1397 "config": [] 1398 } 1399 ] 1400 } 1401 1402 for i, subsys in enumerate(remote_subsystem_list): 1403 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1404 nvme_ctrl = { 1405 "method": "bdev_nvme_attach_controller", 1406 "params": { 1407 "name": "Nvme{}".format(i), 1408 "trtype": self.transport, 1409 "traddr": sub_addr, 1410 "trsvcid": sub_port, 1411 "subnqn": sub_nqn, 1412 "adrfam": "IPv4" 1413 } 1414 } 1415 1416 if self.enable_adq: 1417 nvme_ctrl["params"].update({"priority": "1"}) 1418 1419 if self.enable_data_digest: 1420 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1421 1422 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1423 1424 return json.dumps(bdev_cfg_section, indent=2) 1425 1426 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1427 filename_section = "" 1428 if len(threads) >= len(subsystems): 1429 threads = range(0, len(subsystems)) 1430 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1431 nvme_per_split = int(len(subsystems) / len(threads)) 1432 remainder = len(subsystems) % len(threads) 1433 iterator = iter(filenames) 1434 result = [] 1435 for i in range(len(threads)): 1436 result.append([]) 1437 for _ in range(nvme_per_split): 1438 result[i].append(next(iterator)) 1439 if remainder: 1440 result[i].append(next(iterator)) 1441 remainder -= 1 1442 for i, r in enumerate(result): 1443 header = "[filename%s]" % i 1444 disks = "\n".join(["filename=%s" % x for x in r]) 1445 job_section_qd = round((io_depth * len(r)) / num_jobs) 1446 if job_section_qd == 0: 1447 job_section_qd = 1 1448 iodepth = "iodepth=%s" % job_section_qd 1449 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1450 1451 return filename_section 1452 1453 1454if __name__ == "__main__": 1455 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1456 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1457 1458 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1459 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1460 help='Configuration file.') 1461 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1462 help='Results directory.') 1463 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1464 help='CSV results filename.') 1465 1466 args = parser.parse_args() 1467 1468 print("Using config file: %s" % args.config) 1469 with open(args.config, "r") as config: 1470 data = json.load(config) 1471 1472 initiators = [] 1473 fio_cases = [] 1474 1475 general_config = data["general"] 1476 target_config = data["target"] 1477 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1478 1479 for k, v in data.items(): 1480 if "target" in k: 1481 v.update({"results_dir": args.results}) 1482 if data[k]["mode"] == "spdk": 1483 target_obj = SPDKTarget(k, data["general"], v) 1484 elif data[k]["mode"] == "kernel": 1485 target_obj = KernelTarget(k, data["general"], v) 1486 elif "initiator" in k: 1487 if data[k]["mode"] == "spdk": 1488 init_obj = SPDKInitiator(k, data["general"], v) 1489 elif data[k]["mode"] == "kernel": 1490 init_obj = KernelInitiator(k, data["general"], v) 1491 initiators.append(init_obj) 1492 elif "fio" in k: 1493 fio_workloads = itertools.product(data[k]["bs"], 1494 data[k]["qd"], 1495 data[k]["rw"]) 1496 1497 fio_run_time = data[k]["run_time"] 1498 fio_ramp_time = data[k]["ramp_time"] 1499 fio_rw_mix_read = data[k]["rwmixread"] 1500 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1501 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1502 1503 fio_rate_iops = 0 1504 if "rate_iops" in data[k]: 1505 fio_rate_iops = data[k]["rate_iops"] 1506 else: 1507 continue 1508 1509 try: 1510 os.mkdir(args.results) 1511 except FileExistsError: 1512 pass 1513 1514 for i in initiators: 1515 target_obj.initiator_info.append( 1516 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1517 ) 1518 1519 # TODO: This try block is definietly too large. Need to break this up into separate 1520 # logical blocks to reduce size. 1521 try: 1522 target_obj.tgt_start() 1523 1524 for i in initiators: 1525 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1526 if i.enable_adq: 1527 i.adq_configure_tc() 1528 1529 # Poor mans threading 1530 # Run FIO tests 1531 for block_size, io_depth, rw in fio_workloads: 1532 threads = [] 1533 configs = [] 1534 for i in initiators: 1535 if i.mode == "kernel": 1536 i.kernel_init_connect() 1537 1538 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1539 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1540 configs.append(cfg) 1541 1542 for i, cfg in zip(initiators, configs): 1543 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1544 threads.append(t) 1545 if target_obj.enable_sar: 1546 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1547 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1548 threads.append(t) 1549 1550 if target_obj.enable_pcm: 1551 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1552 1553 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1554 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1555 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1556 1557 threads.append(pcm_cpu_t) 1558 threads.append(pcm_mem_t) 1559 threads.append(pcm_pow_t) 1560 1561 if target_obj.enable_bandwidth: 1562 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1563 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1564 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1565 threads.append(t) 1566 1567 if target_obj.enable_dpdk_memory: 1568 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1569 threads.append(t) 1570 1571 if target_obj.enable_adq: 1572 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1573 threads.append(ethtool_thread) 1574 1575 for t in threads: 1576 t.start() 1577 for t in threads: 1578 t.join() 1579 1580 for i in initiators: 1581 if i.mode == "kernel": 1582 i.kernel_init_disconnect() 1583 i.copy_result_files(args.results) 1584 1585 target_obj.restore_governor() 1586 target_obj.restore_tuned() 1587 target_obj.restore_services() 1588 target_obj.restore_sysctl() 1589 if target_obj.enable_adq: 1590 target_obj.reload_driver("ice") 1591 for i in initiators: 1592 i.restore_governor() 1593 i.restore_tuned() 1594 i.restore_services() 1595 i.restore_sysctl() 1596 if i.enable_adq: 1597 i.reload_driver("ice") 1598 target_obj.parse_results(args.results, args.csv_filename) 1599 finally: 1600 for i in initiators: 1601 try: 1602 i.stop() 1603 except Exception as err: 1604 pass 1605 target_obj.stop() 1606