1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 # pylint: disable=R0201 97 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 98 return "" 99 100 def configure_system(self): 101 self.configure_services() 102 self.configure_sysctl() 103 self.configure_tuned() 104 self.configure_cpu_governor() 105 self.configure_irq_affinity() 106 107 def configure_adq(self): 108 if self.mode == "kernel": 109 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 110 return 111 self.adq_load_modules() 112 self.adq_configure_nic() 113 114 def adq_load_modules(self): 115 self.log_print("Modprobing ADQ-related Linux modules...") 116 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 117 for module in adq_module_deps: 118 try: 119 self.exec_cmd(["sudo", "modprobe", module]) 120 self.log_print("%s loaded!" % module) 121 except CalledProcessError as e: 122 self.log_print("ERROR: failed to load module %s" % module) 123 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 124 125 def adq_configure_tc(self): 126 self.log_print("Configuring ADQ Traffic classes and filters...") 127 128 if self.mode == "kernel": 129 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 130 return 131 132 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 133 num_queues_tc1 = self.num_cores 134 port_param = "dst_port" if isinstance(self, Target) else "src_port" 135 port = "4420" 136 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 137 138 for nic_ip in self.nic_ips: 139 nic_name = self.get_nic_name_by_ip(nic_ip) 140 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 141 "root", "mqprio", "num_tc", "2", "map", "0", "1", 142 "queues", "%s@0" % num_queues_tc0, 143 "%s@%s" % (num_queues_tc1, num_queues_tc0), 144 "hw", "1", "mode", "channel"] 145 self.log_print(" ".join(tc_qdisc_map_cmd)) 146 self.exec_cmd(tc_qdisc_map_cmd) 147 148 time.sleep(5) 149 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 150 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 151 self.exec_cmd(tc_qdisc_ingress_cmd) 152 153 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 154 "protocol", "ip", "ingress", "prio", "1", "flower", 155 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 156 "skip_sw", "hw_tc", "1"] 157 self.log_print(" ".join(tc_filter_cmd)) 158 self.exec_cmd(tc_filter_cmd) 159 160 # show tc configuration 161 self.log_print("Show tc configuration for %s NIC..." % nic_name) 162 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 163 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 164 self.log_print("%s" % tc_disk_out) 165 self.log_print("%s" % tc_filter_out) 166 167 # Ethtool coalesce settings must be applied after configuring traffic classes 168 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 169 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 170 171 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 172 xps_cmd = ["sudo", xps_script_path, nic_name] 173 self.log_print(xps_cmd) 174 self.exec_cmd(xps_cmd) 175 176 def reload_driver(self, driver): 177 try: 178 self.exec_cmd(["sudo", "rmmod", driver]) 179 self.exec_cmd(["sudo", "modprobe", driver]) 180 except CalledProcessError as e: 181 self.log_print("ERROR: failed to reload %s module!" % driver) 182 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 183 184 def adq_configure_nic(self): 185 self.log_print("Configuring NIC port settings for ADQ testing...") 186 187 # Reload the driver first, to make sure any previous settings are re-set. 188 self.reload_driver("ice") 189 190 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 191 for nic in nic_names: 192 self.log_print(nic) 193 try: 194 self.exec_cmd(["sudo", "ethtool", "-K", nic, 195 "hw-tc-offload", "on"]) # Enable hardware TC offload 196 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 197 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 198 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 199 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 200 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 201 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 202 "channel-pkt-inspect-optimize", "on"]) 203 except CalledProcessError as e: 204 self.log_print("ERROR: failed to configure NIC port using ethtool!") 205 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 206 self.log_print("Please update your NIC driver and firmware versions and try again.") 207 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 208 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 209 210 def configure_services(self): 211 self.log_print("Configuring active services...") 212 svc_config = configparser.ConfigParser(strict=False) 213 214 # Below list is valid only for RHEL / Fedora systems and might not 215 # contain valid names for other distributions. 216 svc_target_state = { 217 "firewalld": "inactive", 218 "irqbalance": "inactive", 219 "lldpad.service": "inactive", 220 "lldpad.socket": "inactive" 221 } 222 223 for service in svc_target_state: 224 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 225 out = "\n".join(["[%s]" % service, out]) 226 svc_config.read_string(out) 227 228 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 229 continue 230 231 service_state = svc_config[service]["ActiveState"] 232 self.log_print("Current state of %s service is %s" % (service, service_state)) 233 self.svc_restore_dict.update({service: service_state}) 234 if service_state != "inactive": 235 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 236 self.exec_cmd(["sudo", "systemctl", "stop", service]) 237 238 def configure_sysctl(self): 239 self.log_print("Tuning sysctl settings...") 240 241 busy_read = 0 242 if self.enable_adq and self.mode == "spdk": 243 busy_read = 1 244 245 sysctl_opts = { 246 "net.core.busy_poll": 0, 247 "net.core.busy_read": busy_read, 248 "net.core.somaxconn": 4096, 249 "net.core.netdev_max_backlog": 8192, 250 "net.ipv4.tcp_max_syn_backlog": 16384, 251 "net.core.rmem_max": 268435456, 252 "net.core.wmem_max": 268435456, 253 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 254 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 255 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 256 "net.ipv4.route.flush": 1, 257 "vm.overcommit_memory": 1, 258 } 259 260 for opt, value in sysctl_opts.items(): 261 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 262 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 263 264 def configure_tuned(self): 265 if not self.tuned_profile: 266 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 267 return 268 269 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 270 service = "tuned" 271 tuned_config = configparser.ConfigParser(strict=False) 272 273 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 274 out = "\n".join(["[%s]" % service, out]) 275 tuned_config.read_string(out) 276 tuned_state = tuned_config[service]["ActiveState"] 277 self.svc_restore_dict.update({service: tuned_state}) 278 279 if tuned_state != "inactive": 280 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 281 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 282 283 self.tuned_restore_dict = { 284 "profile": profile, 285 "mode": profile_mode 286 } 287 288 self.exec_cmd(["sudo", "systemctl", "start", service]) 289 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 290 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 291 292 def configure_cpu_governor(self): 293 self.log_print("Setting CPU governor to performance...") 294 295 # This assumes that there is the same CPU scaling governor on each CPU 296 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 297 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 298 299 def configure_irq_affinity(self): 300 self.log_print("Setting NIC irq affinity for NICs...") 301 302 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 303 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 304 for nic in nic_names: 305 irq_cmd = ["sudo", irq_script_path, nic] 306 self.log_print(irq_cmd) 307 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 308 309 def restore_services(self): 310 self.log_print("Restoring services...") 311 for service, state in self.svc_restore_dict.items(): 312 cmd = "stop" if state == "inactive" else "start" 313 self.exec_cmd(["sudo", "systemctl", cmd, service]) 314 315 def restore_sysctl(self): 316 self.log_print("Restoring sysctl settings...") 317 for opt, value in self.sysctl_restore_dict.items(): 318 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 319 320 def restore_tuned(self): 321 self.log_print("Restoring tuned-adm settings...") 322 323 if not self.tuned_restore_dict: 324 return 325 326 if self.tuned_restore_dict["mode"] == "auto": 327 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 328 self.log_print("Reverted tuned-adm to auto_profile.") 329 else: 330 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 331 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 332 333 def restore_governor(self): 334 self.log_print("Restoring CPU governor setting...") 335 if self.governor_restore: 336 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 337 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 338 339 340class Target(Server): 341 def __init__(self, name, general_config, target_config): 342 super().__init__(name, general_config, target_config) 343 344 # Defaults 345 self.enable_sar = False 346 self.sar_delay = 0 347 self.sar_interval = 0 348 self.sar_count = 0 349 self.enable_pcm = False 350 self.pcm_dir = "" 351 self.pcm_delay = 0 352 self.pcm_interval = 0 353 self.pcm_count = 0 354 self.enable_bandwidth = 0 355 self.bandwidth_count = 0 356 self.enable_dpdk_memory = False 357 self.dpdk_wait_time = 0 358 self.enable_zcopy = False 359 self.scheduler_name = "static" 360 self.null_block = 0 361 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 362 self.subsystem_info_list = [] 363 self.initiator_info = [] 364 365 if "null_block_devices" in target_config: 366 self.null_block = target_config["null_block_devices"] 367 if "sar_settings" in target_config: 368 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 369 if "pcm_settings" in target_config: 370 self.enable_pcm = True 371 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 372 if "enable_bandwidth" in target_config: 373 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 374 if "enable_dpdk_memory" in target_config: 375 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 376 if "scheduler_settings" in target_config: 377 self.scheduler_name = target_config["scheduler_settings"] 378 if "zcopy_settings" in target_config: 379 self.enable_zcopy = target_config["zcopy_settings"] 380 if "results_dir" in target_config: 381 self.results_dir = target_config["results_dir"] 382 383 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 384 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 385 self.set_local_nic_info(self.set_local_nic_info_helper()) 386 387 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 388 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 389 390 self.configure_system() 391 if self.enable_adq: 392 self.configure_adq() 393 self.sys_config() 394 395 def set_local_nic_info_helper(self): 396 return json.loads(self.exec_cmd(["lshw", "-json"])) 397 398 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 399 stderr_opt = None 400 if stderr_redirect: 401 stderr_opt = subprocess.STDOUT 402 if change_dir: 403 old_cwd = os.getcwd() 404 os.chdir(change_dir) 405 self.log_print("Changing directory to %s" % change_dir) 406 407 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 408 409 if change_dir: 410 os.chdir(old_cwd) 411 self.log_print("Changing directory to %s" % old_cwd) 412 return out 413 414 def zip_spdk_sources(self, spdk_dir, dest_file): 415 self.log_print("Zipping SPDK source directory") 416 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 417 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 418 for file in files: 419 fh.write(os.path.relpath(os.path.join(root, file))) 420 fh.close() 421 self.log_print("Done zipping") 422 423 @staticmethod 424 def _chunks(input_list, chunks_no): 425 div, rem = divmod(len(input_list), chunks_no) 426 for i in range(chunks_no): 427 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 428 yield input_list[si:si + (div + 1 if i < rem else div)] 429 430 def spread_bdevs(self, req_disks): 431 # Spread available block devices indexes: 432 # - evenly across available initiator systems 433 # - evenly across available NIC interfaces for 434 # each initiator 435 # Not NUMA aware. 436 ip_bdev_map = [] 437 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 438 439 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 440 self.initiator_info[i]["bdev_range"] = init_chunk 441 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 442 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 443 for c in nic_chunk: 444 ip_bdev_map.append((ip, c)) 445 return ip_bdev_map 446 447 @staticmethod 448 def read_json_stats(file): 449 with open(file, "r") as json_data: 450 data = json.load(json_data) 451 job_pos = 0 # job_post = 0 because using aggregated results 452 453 # Check if latency is in nano or microseconds to choose correct dict key 454 def get_lat_unit(key_prefix, dict_section): 455 # key prefix - lat, clat or slat. 456 # dict section - portion of json containing latency bucket in question 457 # Return dict key to access the bucket and unit as string 458 for k, _ in dict_section.items(): 459 if k.startswith(key_prefix): 460 return k, k.split("_")[1] 461 462 def get_clat_percentiles(clat_dict_leaf): 463 if "percentile" in clat_dict_leaf: 464 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 465 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 466 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 467 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 468 469 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 470 else: 471 # Latest fio versions do not provide "percentile" results if no 472 # measurements were done, so just return zeroes 473 return [0, 0, 0, 0] 474 475 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 476 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 477 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 478 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 479 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 480 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 481 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 482 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 483 data["jobs"][job_pos]["read"][clat_key]) 484 485 if "ns" in lat_unit: 486 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 487 if "ns" in clat_unit: 488 read_p99_lat = read_p99_lat / 1000 489 read_p99_9_lat = read_p99_9_lat / 1000 490 read_p99_99_lat = read_p99_99_lat / 1000 491 read_p99_999_lat = read_p99_999_lat / 1000 492 493 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 494 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 495 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 496 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 497 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 498 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 499 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 500 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 501 data["jobs"][job_pos]["write"][clat_key]) 502 503 if "ns" in lat_unit: 504 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 505 if "ns" in clat_unit: 506 write_p99_lat = write_p99_lat / 1000 507 write_p99_9_lat = write_p99_9_lat / 1000 508 write_p99_99_lat = write_p99_99_lat / 1000 509 write_p99_999_lat = write_p99_999_lat / 1000 510 511 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 512 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 513 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 514 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 515 516 def parse_results(self, results_dir, csv_file): 517 files = os.listdir(results_dir) 518 fio_files = filter(lambda x: ".fio" in x, files) 519 json_files = [x for x in files if ".json" in x] 520 521 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 522 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 523 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 524 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 525 526 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 527 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 528 529 header_line = ",".join(["Name", *headers]) 530 aggr_header_line = ",".join(["Name", *aggr_headers]) 531 532 # Create empty results file 533 with open(os.path.join(results_dir, csv_file), "w") as fh: 534 fh.write(aggr_header_line + "\n") 535 rows = set() 536 537 for fio_config in fio_files: 538 self.log_print("Getting FIO stats for %s" % fio_config) 539 job_name, _ = os.path.splitext(fio_config) 540 541 # Look in the filename for rwmixread value. Function arguments do 542 # not have that information. 543 # TODO: Improve this function by directly using workload params instead 544 # of regexing through filenames. 545 if "read" in job_name: 546 rw_mixread = 1 547 elif "write" in job_name: 548 rw_mixread = 0 549 else: 550 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 551 552 # If "_CPU" exists in name - ignore it 553 # Initiators for the same job could have different num_cores parameter 554 job_name = re.sub(r"_\d+CPU", "", job_name) 555 job_result_files = [x for x in json_files if x.startswith(job_name)] 556 self.log_print("Matching result files for current fio config:") 557 for j in job_result_files: 558 self.log_print("\t %s" % j) 559 560 # There may have been more than 1 initiator used in test, need to check that 561 # Result files are created so that string after last "_" separator is server name 562 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 563 inits_avg_results = [] 564 for i in inits_names: 565 self.log_print("\tGetting stats for initiator %s" % i) 566 # There may have been more than 1 test run for this job, calculate average results for initiator 567 i_results = [x for x in job_result_files if i in x] 568 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 569 570 separate_stats = [] 571 for r in i_results: 572 try: 573 stats = self.read_json_stats(os.path.join(results_dir, r)) 574 separate_stats.append(stats) 575 self.log_print(stats) 576 except JSONDecodeError: 577 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 578 579 init_results = [sum(x) for x in zip(*separate_stats)] 580 init_results = [x / len(separate_stats) for x in init_results] 581 inits_avg_results.append(init_results) 582 583 self.log_print("\tAverage results for initiator %s" % i) 584 self.log_print(init_results) 585 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 586 fh.write(header_line + "\n") 587 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 588 589 # Sum results of all initiators running this FIO job. 590 # Latency results are an average of latencies from accros all initiators. 591 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 592 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 593 for key in inits_avg_results: 594 if "lat" in key: 595 inits_avg_results[key] /= len(inits_names) 596 597 # Aggregate separate read/write values into common labels 598 # Take rw_mixread into consideration for mixed read/write workloads. 599 aggregate_results = OrderedDict() 600 for h in aggr_headers: 601 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 602 if "lat" in h: 603 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 604 else: 605 _ = read_stat + write_stat 606 aggregate_results[h] = "{0:.3f}".format(_) 607 608 rows.add(",".join([job_name, *aggregate_results.values()])) 609 610 # Save results to file 611 for row in rows: 612 with open(os.path.join(results_dir, csv_file), "a") as fh: 613 fh.write(row + "\n") 614 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 615 616 def measure_sar(self, results_dir, sar_file_prefix): 617 cpu_number = os.cpu_count() 618 sar_idle_sum = 0 619 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 620 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 621 622 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 623 time.sleep(self.sar_delay) 624 self.log_print("Starting SAR measurements") 625 626 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 627 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 628 for line in out.split("\n"): 629 if "Average" in line: 630 if "CPU" in line: 631 self.log_print("Summary CPU utilization from SAR:") 632 self.log_print(line) 633 elif "all" in line: 634 self.log_print(line) 635 else: 636 sar_idle_sum += float(line.split()[7]) 637 fh.write(out) 638 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 639 640 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 641 f.write("%0.2f" % sar_cpu_usage) 642 643 def ethtool_after_fio_ramp(self, fio_ramp_time): 644 time.sleep(fio_ramp_time//2) 645 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 646 for nic in nic_names: 647 self.log_print(nic) 648 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 649 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 650 651 def measure_pcm_memory(self, results_dir, pcm_file_name): 652 time.sleep(self.pcm_delay) 653 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 654 pcm_memory = subprocess.Popen(cmd) 655 time.sleep(self.pcm_count) 656 pcm_memory.terminate() 657 658 def measure_pcm(self, results_dir, pcm_file_name): 659 time.sleep(self.pcm_delay) 660 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 661 subprocess.run(cmd) 662 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 663 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 664 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 665 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 666 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 667 668 def measure_pcm_power(self, results_dir, pcm_power_file_name): 669 time.sleep(self.pcm_delay) 670 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 671 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 672 fh.write(out) 673 674 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 675 self.log_print("INFO: starting network bandwidth measure") 676 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 677 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 678 679 def measure_dpdk_memory(self, results_dir): 680 self.log_print("INFO: waiting to generate DPDK memory usage") 681 time.sleep(self.dpdk_wait_time) 682 self.log_print("INFO: generating DPDK memory usage") 683 rpc.env.env_dpdk_get_mem_stats 684 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 685 686 def sys_config(self): 687 self.log_print("====Kernel release:====") 688 self.log_print(os.uname().release) 689 self.log_print("====Kernel command line:====") 690 with open('/proc/cmdline') as f: 691 cmdline = f.readlines() 692 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 693 self.log_print("====sysctl conf:====") 694 with open('/etc/sysctl.conf') as f: 695 sysctl = f.readlines() 696 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 697 self.log_print("====Cpu power info:====") 698 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 699 self.log_print("====zcopy settings:====") 700 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 701 self.log_print("====Scheduler settings:====") 702 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 703 704 705class Initiator(Server): 706 def __init__(self, name, general_config, initiator_config): 707 super().__init__(name, general_config, initiator_config) 708 709 # Required fields 710 self.ip = initiator_config["ip"] 711 self.target_nic_ips = initiator_config["target_nic_ips"] 712 713 # Defaults 714 self.cpus_allowed = None 715 self.cpus_allowed_policy = "shared" 716 self.spdk_dir = "/tmp/spdk" 717 self.fio_bin = "/usr/src/fio/fio" 718 self.nvmecli_bin = "nvme" 719 self.cpu_frequency = None 720 self.subsystem_info_list = [] 721 722 if "spdk_dir" in initiator_config: 723 self.spdk_dir = initiator_config["spdk_dir"] 724 if "fio_bin" in initiator_config: 725 self.fio_bin = initiator_config["fio_bin"] 726 if "nvmecli_bin" in initiator_config: 727 self.nvmecli_bin = initiator_config["nvmecli_bin"] 728 if "cpus_allowed" in initiator_config: 729 self.cpus_allowed = initiator_config["cpus_allowed"] 730 if "cpus_allowed_policy" in initiator_config: 731 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 732 if "cpu_frequency" in initiator_config: 733 self.cpu_frequency = initiator_config["cpu_frequency"] 734 if os.getenv('SPDK_WORKSPACE'): 735 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 736 737 self.ssh_connection = paramiko.SSHClient() 738 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 739 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 740 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 741 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 742 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 743 744 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 745 self.copy_spdk("/tmp/spdk.zip") 746 self.set_local_nic_info(self.set_local_nic_info_helper()) 747 self.set_cpu_frequency() 748 self.configure_system() 749 if self.enable_adq: 750 self.configure_adq() 751 self.sys_config() 752 753 def set_local_nic_info_helper(self): 754 return json.loads(self.exec_cmd(["lshw", "-json"])) 755 756 def stop(self): 757 self.ssh_connection.close() 758 759 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 760 if change_dir: 761 cmd = ["cd", change_dir, ";", *cmd] 762 763 # In case one of the command elements contains whitespace and is not 764 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 765 # when sending to remote system. 766 for i, c in enumerate(cmd): 767 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 768 cmd[i] = '"%s"' % c 769 cmd = " ".join(cmd) 770 771 # Redirect stderr to stdout thanks using get_pty option if needed 772 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 773 out = stdout.read().decode(encoding="utf-8") 774 775 # Check the return code 776 rc = stdout.channel.recv_exit_status() 777 if rc: 778 raise CalledProcessError(int(rc), cmd, out) 779 780 return out 781 782 def put_file(self, local, remote_dest): 783 ftp = self.ssh_connection.open_sftp() 784 ftp.put(local, remote_dest) 785 ftp.close() 786 787 def get_file(self, remote, local_dest): 788 ftp = self.ssh_connection.open_sftp() 789 ftp.get(remote, local_dest) 790 ftp.close() 791 792 def copy_spdk(self, local_spdk_zip): 793 self.log_print("Copying SPDK sources to initiator %s" % self.name) 794 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 795 self.log_print("Copied sources zip from target") 796 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 797 self.log_print("Sources unpacked") 798 799 def copy_result_files(self, dest_dir): 800 self.log_print("Copying results") 801 802 if not os.path.exists(dest_dir): 803 os.mkdir(dest_dir) 804 805 # Get list of result files from initiator and copy them back to target 806 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 807 808 for file in file_list: 809 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 810 os.path.join(dest_dir, file)) 811 self.log_print("Done copying results") 812 813 def discover_subsystems(self, address_list, subsys_no): 814 num_nvmes = range(0, subsys_no) 815 nvme_discover_output = "" 816 for ip, subsys_no in itertools.product(address_list, num_nvmes): 817 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 818 nvme_discover_cmd = ["sudo", 819 "%s" % self.nvmecli_bin, 820 "discover", "-t", "%s" % self.transport, 821 "-s", "%s" % (4420 + subsys_no), 822 "-a", "%s" % ip] 823 824 try: 825 stdout = self.exec_cmd(nvme_discover_cmd) 826 if stdout: 827 nvme_discover_output = nvme_discover_output + stdout 828 except CalledProcessError: 829 # Do nothing. In case of discovering remote subsystems of kernel target 830 # we expect "nvme discover" to fail a bunch of times because we basically 831 # scan ports. 832 pass 833 834 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 835 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 836 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 837 nvme_discover_output) # from nvme discovery output 838 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 839 subsystems = filter(lambda x: "discovery" not in x[1], subsystems) 840 subsystems = list(set(subsystems)) 841 subsystems.sort(key=lambda x: x[1]) 842 self.log_print("Found matching subsystems on target side:") 843 for s in subsystems: 844 self.log_print(s) 845 self.subsystem_info_list = subsystems 846 847 def gen_fio_filename_conf(self, *args, **kwargs): 848 # Logic implemented in SPDKInitiator and KernelInitiator classes 849 pass 850 851 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 852 fio_conf_template = """ 853[global] 854ioengine={ioengine} 855{spdk_conf} 856thread=1 857group_reporting=1 858direct=1 859percentile_list=50:90:99:99.5:99.9:99.99:99.999 860 861norandommap=1 862rw={rw} 863rwmixread={rwmixread} 864bs={block_size} 865time_based=1 866ramp_time={ramp_time} 867runtime={run_time} 868rate_iops={rate_iops} 869""" 870 if "spdk" in self.mode: 871 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 872 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 873 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 874 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 875 else: 876 ioengine = self.ioengine 877 spdk_conf = "" 878 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 879 "|", "awk", "'{print $1}'"]) 880 subsystems = [x for x in out.split("\n") if "nvme" in x] 881 882 if self.cpus_allowed is not None: 883 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 884 cpus_num = 0 885 cpus = self.cpus_allowed.split(",") 886 for cpu in cpus: 887 if "-" in cpu: 888 a, b = cpu.split("-") 889 a = int(a) 890 b = int(b) 891 cpus_num += len(range(a, b)) 892 else: 893 cpus_num += 1 894 self.num_cores = cpus_num 895 threads = range(0, self.num_cores) 896 elif hasattr(self, 'num_cores'): 897 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 898 threads = range(0, int(self.num_cores)) 899 else: 900 self.num_cores = len(subsystems) 901 threads = range(0, len(subsystems)) 902 903 if "spdk" in self.mode: 904 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 905 else: 906 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 907 908 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 909 rw=rw, rwmixread=rwmixread, block_size=block_size, 910 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 911 912 # TODO: hipri disabled for now, as it causes fio errors: 913 # io_u error on file /dev/nvme2n1: Operation not supported 914 # See comment in KernelInitiator class, kernel_init_connect() function 915 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 916 fio_config = fio_config + """ 917fixedbufs=1 918registerfiles=1 919#hipri=1 920""" 921 if num_jobs: 922 fio_config = fio_config + "numjobs=%s \n" % num_jobs 923 if self.cpus_allowed is not None: 924 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 925 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 926 fio_config = fio_config + filename_section 927 928 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 929 if hasattr(self, "num_cores"): 930 fio_config_filename += "_%sCPU" % self.num_cores 931 fio_config_filename += ".fio" 932 933 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 934 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 935 self.log_print("Created FIO Config:") 936 self.log_print(fio_config) 937 938 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 939 940 def set_cpu_frequency(self): 941 if self.cpu_frequency is not None: 942 try: 943 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 944 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 945 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 946 except Exception: 947 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 948 sys.exit() 949 else: 950 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 951 952 def run_fio(self, fio_config_file, run_num=None): 953 job_name, _ = os.path.splitext(fio_config_file) 954 self.log_print("Starting FIO run for job: %s" % job_name) 955 self.log_print("Using FIO: %s" % self.fio_bin) 956 957 if run_num: 958 for i in range(1, run_num + 1): 959 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 960 try: 961 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 962 "--output=%s" % output_filename, "--eta=never"], True) 963 self.log_print(output) 964 except subprocess.CalledProcessError as e: 965 self.log_print("ERROR: Fio process failed!") 966 self.log_print(e.stdout) 967 else: 968 output_filename = job_name + "_" + self.name + ".json" 969 output = self.exec_cmd(["sudo", self.fio_bin, 970 fio_config_file, "--output-format=json", 971 "--output" % output_filename], True) 972 self.log_print(output) 973 self.log_print("FIO run finished. Results in: %s" % output_filename) 974 975 def sys_config(self): 976 self.log_print("====Kernel release:====") 977 self.log_print(self.exec_cmd(["uname", "-r"])) 978 self.log_print("====Kernel command line:====") 979 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 980 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 981 self.log_print("====sysctl conf:====") 982 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 983 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 984 self.log_print("====Cpu power info:====") 985 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 986 987 988class KernelTarget(Target): 989 def __init__(self, name, general_config, target_config): 990 super().__init__(name, general_config, target_config) 991 # Defaults 992 self.nvmet_bin = "nvmetcli" 993 994 if "nvmet_bin" in target_config: 995 self.nvmet_bin = target_config["nvmet_bin"] 996 997 def stop(self): 998 nvmet_command(self.nvmet_bin, "clear") 999 1000 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1001 1002 nvmet_cfg = { 1003 "ports": [], 1004 "hosts": [], 1005 "subsystems": [], 1006 } 1007 1008 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1009 port = str(4420 + bdev_num) 1010 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1011 serial = "SPDK00%s" % bdev_num 1012 bdev_name = nvme_list[bdev_num] 1013 1014 nvmet_cfg["subsystems"].append({ 1015 "allowed_hosts": [], 1016 "attr": { 1017 "allow_any_host": "1", 1018 "serial": serial, 1019 "version": "1.3" 1020 }, 1021 "namespaces": [ 1022 { 1023 "device": { 1024 "path": bdev_name, 1025 "uuid": "%s" % uuid.uuid4() 1026 }, 1027 "enable": 1, 1028 "nsid": port 1029 } 1030 ], 1031 "nqn": nqn 1032 }) 1033 1034 nvmet_cfg["ports"].append({ 1035 "addr": { 1036 "adrfam": "ipv4", 1037 "traddr": ip, 1038 "trsvcid": port, 1039 "trtype": self.transport 1040 }, 1041 "portid": bdev_num, 1042 "referrals": [], 1043 "subsystems": [nqn] 1044 }) 1045 1046 self.subsystem_info_list.append([port, nqn, ip]) 1047 self.subsys_no = len(self.subsystem_info_list) 1048 1049 with open("kernel.conf", "w") as fh: 1050 fh.write(json.dumps(nvmet_cfg, indent=2)) 1051 1052 def tgt_start(self): 1053 self.log_print("Configuring kernel NVMeOF Target") 1054 1055 if self.null_block: 1056 print("Configuring with null block device.") 1057 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1058 else: 1059 print("Configuring with NVMe drives.") 1060 nvme_list = get_nvme_devices() 1061 1062 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1063 self.subsys_no = len(nvme_list) 1064 1065 nvmet_command(self.nvmet_bin, "clear") 1066 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1067 1068 if self.enable_adq: 1069 self.adq_configure_tc() 1070 1071 self.log_print("Done configuring kernel NVMeOF Target") 1072 1073 1074class SPDKTarget(Target): 1075 def __init__(self, name, general_config, target_config): 1076 super().__init__(name, general_config, target_config) 1077 1078 # Required fields 1079 self.core_mask = target_config["core_mask"] 1080 self.num_cores = self.get_num_cores(self.core_mask) 1081 1082 # Defaults 1083 self.dif_insert_strip = False 1084 self.null_block_dif_type = 0 1085 self.num_shared_buffers = 4096 1086 self.max_queue_depth = 128 1087 self.bpf_proc = None 1088 self.bpf_scripts = [] 1089 self.enable_dsa = False 1090 self.scheduler_core_limit = None 1091 1092 if "num_shared_buffers" in target_config: 1093 self.num_shared_buffers = target_config["num_shared_buffers"] 1094 if "max_queue_depth" in target_config: 1095 self.max_queue_depth = target_config["max_queue_depth"] 1096 if "null_block_dif_type" in target_config: 1097 self.null_block_dif_type = target_config["null_block_dif_type"] 1098 if "dif_insert_strip" in target_config: 1099 self.dif_insert_strip = target_config["dif_insert_strip"] 1100 if "bpf_scripts" in target_config: 1101 self.bpf_scripts = target_config["bpf_scripts"] 1102 if "dsa_settings" in target_config: 1103 self.enable_dsa = target_config["dsa_settings"] 1104 if "scheduler_core_limit" in target_config: 1105 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1106 1107 self.log_print("====DSA settings:====") 1108 self.log_print("DSA enabled: %s" % (self.enable_dsa)) 1109 1110 @staticmethod 1111 def get_num_cores(core_mask): 1112 if "0x" in core_mask: 1113 return bin(int(core_mask, 16)).count("1") 1114 else: 1115 num_cores = 0 1116 core_mask = core_mask.replace("[", "") 1117 core_mask = core_mask.replace("]", "") 1118 for i in core_mask.split(","): 1119 if "-" in i: 1120 x, y = i.split("-") 1121 num_cores += len(range(int(x), int(y))) + 1 1122 else: 1123 num_cores += 1 1124 return num_cores 1125 1126 def spdk_tgt_configure(self): 1127 self.log_print("Configuring SPDK NVMeOF target via RPC") 1128 1129 if self.enable_adq: 1130 self.adq_configure_tc() 1131 1132 # Create transport layer 1133 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1134 num_shared_buffers=self.num_shared_buffers, 1135 max_queue_depth=self.max_queue_depth, 1136 dif_insert_or_strip=self.dif_insert_strip, 1137 sock_priority=self.adq_priority) 1138 self.log_print("SPDK NVMeOF transport layer:") 1139 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1140 1141 if self.null_block: 1142 self.spdk_tgt_add_nullblock(self.null_block) 1143 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1144 else: 1145 self.spdk_tgt_add_nvme_conf() 1146 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1147 1148 self.log_print("Done configuring SPDK NVMeOF Target") 1149 1150 def spdk_tgt_add_nullblock(self, null_block_count): 1151 md_size = 0 1152 block_size = 4096 1153 if self.null_block_dif_type != 0: 1154 md_size = 128 1155 1156 self.log_print("Adding null block bdevices to config via RPC") 1157 for i in range(null_block_count): 1158 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1159 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1160 dif_type=self.null_block_dif_type, md_size=md_size) 1161 self.log_print("SPDK Bdevs configuration:") 1162 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1163 1164 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1165 self.log_print("Adding NVMe bdevs to config via RPC") 1166 1167 bdfs = get_nvme_devices_bdf() 1168 bdfs = [b.replace(":", ".") for b in bdfs] 1169 1170 if req_num_disks: 1171 if req_num_disks > len(bdfs): 1172 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1173 sys.exit(1) 1174 else: 1175 bdfs = bdfs[0:req_num_disks] 1176 1177 for i, bdf in enumerate(bdfs): 1178 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1179 1180 self.log_print("SPDK Bdevs configuration:") 1181 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1182 1183 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1184 self.log_print("Adding subsystems to config") 1185 if not req_num_disks: 1186 req_num_disks = get_nvme_devices_count() 1187 1188 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1189 port = str(4420 + bdev_num) 1190 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1191 serial = "SPDK00%s" % bdev_num 1192 bdev_name = "Nvme%sn1" % bdev_num 1193 1194 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1195 allow_any_host=True, max_namespaces=8) 1196 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1197 for nqn_name in [nqn, "discovery"]: 1198 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1199 nqn=nqn_name, 1200 trtype=self.transport, 1201 traddr=ip, 1202 trsvcid=port, 1203 adrfam="ipv4") 1204 self.subsystem_info_list.append([port, nqn, ip]) 1205 self.subsys_no = len(self.subsystem_info_list) 1206 1207 self.log_print("SPDK NVMeOF subsystem configuration:") 1208 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1209 1210 def bpf_start(self): 1211 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1212 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1213 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1214 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1215 1216 with open(self.pid, "r") as fh: 1217 nvmf_pid = str(fh.readline()) 1218 1219 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1220 self.log_print(cmd) 1221 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1222 1223 def tgt_start(self): 1224 if self.null_block: 1225 self.subsys_no = 1 1226 else: 1227 self.subsys_no = get_nvme_devices_count() 1228 self.log_print("Starting SPDK NVMeOF Target process") 1229 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1230 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1231 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1232 1233 with open(self.pid, "w") as fh: 1234 fh.write(str(proc.pid)) 1235 self.nvmf_proc = proc 1236 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1237 self.log_print("Waiting for spdk to initialize...") 1238 while True: 1239 if os.path.exists("/var/tmp/spdk.sock"): 1240 break 1241 time.sleep(1) 1242 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1243 1244 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1245 1246 if self.enable_zcopy: 1247 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1248 enable_zerocopy_send_server=True) 1249 self.log_print("Target socket options:") 1250 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1251 1252 if self.enable_adq: 1253 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1254 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1255 nvme_adminq_poll_period_us=100000, retry_count=4) 1256 1257 if self.enable_dsa: 1258 rpc.dsa.dsa_scan_accel_engine(self.client, config_kernel_mode=None) 1259 self.log_print("Target DSA accel engine enabled") 1260 1261 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1262 rpc.framework_start_init(self.client) 1263 1264 if self.bpf_scripts: 1265 self.bpf_start() 1266 1267 self.spdk_tgt_configure() 1268 1269 def stop(self): 1270 if self.bpf_proc: 1271 self.log_print("Stopping BPF Trace script") 1272 self.bpf_proc.terminate() 1273 self.bpf_proc.wait() 1274 1275 if hasattr(self, "nvmf_proc"): 1276 try: 1277 self.nvmf_proc.terminate() 1278 self.nvmf_proc.wait() 1279 except Exception as e: 1280 self.log_print(e) 1281 self.nvmf_proc.kill() 1282 self.nvmf_proc.communicate() 1283 1284 1285class KernelInitiator(Initiator): 1286 def __init__(self, name, general_config, initiator_config): 1287 super().__init__(name, general_config, initiator_config) 1288 1289 # Defaults 1290 self.extra_params = "" 1291 self.ioengine = "libaio" 1292 1293 if "extra_params" in initiator_config: 1294 self.extra_params = initiator_config["extra_params"] 1295 1296 if "kernel_engine" in initiator_config: 1297 self.ioengine = initiator_config["kernel_engine"] 1298 if "io_uring" in self.ioengine: 1299 self.extra_params = "--nr-poll-queues=8" 1300 1301 def get_connected_nvme_list(self): 1302 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1303 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1304 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1305 return nvme_list 1306 1307 def kernel_init_connect(self): 1308 self.log_print("Below connection attempts may result in error messages, this is expected!") 1309 for subsystem in self.subsystem_info_list: 1310 self.log_print("Trying to connect %s %s %s" % subsystem) 1311 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1312 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1313 time.sleep(2) 1314 1315 if "io_uring" in self.ioengine: 1316 self.log_print("Setting block layer settings for io_uring.") 1317 1318 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1319 # apparently it's not possible for connected subsystems. 1320 # Results in "error: Invalid argument" 1321 block_sysfs_settings = { 1322 "iostats": "0", 1323 "rq_affinity": "0", 1324 "nomerges": "2" 1325 } 1326 1327 for disk in self.get_connected_nvme_list(): 1328 sysfs = os.path.join("/sys/block", disk, "queue") 1329 for k, v in block_sysfs_settings.items(): 1330 sysfs_opt_path = os.path.join(sysfs, k) 1331 try: 1332 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1333 except subprocess.CalledProcessError as e: 1334 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1335 finally: 1336 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1337 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1338 1339 def kernel_init_disconnect(self): 1340 for subsystem in self.subsystem_info_list: 1341 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1342 time.sleep(1) 1343 1344 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1345 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1346 1347 filename_section = "" 1348 nvme_per_split = int(len(nvme_list) / len(threads)) 1349 remainder = len(nvme_list) % len(threads) 1350 iterator = iter(nvme_list) 1351 result = [] 1352 for i in range(len(threads)): 1353 result.append([]) 1354 for _ in range(nvme_per_split): 1355 result[i].append(next(iterator)) 1356 if remainder: 1357 result[i].append(next(iterator)) 1358 remainder -= 1 1359 for i, r in enumerate(result): 1360 header = "[filename%s]" % i 1361 disks = "\n".join(["filename=%s" % x for x in r]) 1362 job_section_qd = round((io_depth * len(r)) / num_jobs) 1363 if job_section_qd == 0: 1364 job_section_qd = 1 1365 iodepth = "iodepth=%s" % job_section_qd 1366 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1367 1368 return filename_section 1369 1370 1371class SPDKInitiator(Initiator): 1372 def __init__(self, name, general_config, initiator_config): 1373 super().__init__(name, general_config, initiator_config) 1374 1375 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1376 self.install_spdk() 1377 1378 # Required fields 1379 self.num_cores = initiator_config["num_cores"] 1380 1381 # Optional fields 1382 self.enable_data_digest = False 1383 if "enable_data_digest" in initiator_config: 1384 self.enable_data_digest = initiator_config["enable_data_digest"] 1385 1386 def install_spdk(self): 1387 self.log_print("Using fio binary %s" % self.fio_bin) 1388 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1389 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1390 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1391 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1392 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1393 1394 self.log_print("SPDK built") 1395 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1396 1397 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1398 bdev_cfg_section = { 1399 "subsystems": [ 1400 { 1401 "subsystem": "bdev", 1402 "config": [] 1403 } 1404 ] 1405 } 1406 1407 for i, subsys in enumerate(remote_subsystem_list): 1408 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1409 nvme_ctrl = { 1410 "method": "bdev_nvme_attach_controller", 1411 "params": { 1412 "name": "Nvme{}".format(i), 1413 "trtype": self.transport, 1414 "traddr": sub_addr, 1415 "trsvcid": sub_port, 1416 "subnqn": sub_nqn, 1417 "adrfam": "IPv4" 1418 } 1419 } 1420 1421 if self.enable_adq: 1422 nvme_ctrl["params"].update({"priority": "1"}) 1423 1424 if self.enable_data_digest: 1425 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1426 1427 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1428 1429 return json.dumps(bdev_cfg_section, indent=2) 1430 1431 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1432 filename_section = "" 1433 if len(threads) >= len(subsystems): 1434 threads = range(0, len(subsystems)) 1435 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1436 nvme_per_split = int(len(subsystems) / len(threads)) 1437 remainder = len(subsystems) % len(threads) 1438 iterator = iter(filenames) 1439 result = [] 1440 for i in range(len(threads)): 1441 result.append([]) 1442 for _ in range(nvme_per_split): 1443 result[i].append(next(iterator)) 1444 if remainder: 1445 result[i].append(next(iterator)) 1446 remainder -= 1 1447 for i, r in enumerate(result): 1448 header = "[filename%s]" % i 1449 disks = "\n".join(["filename=%s" % x for x in r]) 1450 job_section_qd = round((io_depth * len(r)) / num_jobs) 1451 if job_section_qd == 0: 1452 job_section_qd = 1 1453 iodepth = "iodepth=%s" % job_section_qd 1454 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1455 1456 return filename_section 1457 1458 1459if __name__ == "__main__": 1460 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1461 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1462 1463 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1464 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1465 help='Configuration file.') 1466 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1467 help='Results directory.') 1468 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1469 help='CSV results filename.') 1470 1471 args = parser.parse_args() 1472 1473 print("Using config file: %s" % args.config) 1474 with open(args.config, "r") as config: 1475 data = json.load(config) 1476 1477 initiators = [] 1478 fio_cases = [] 1479 1480 general_config = data["general"] 1481 target_config = data["target"] 1482 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1483 1484 for k, v in data.items(): 1485 if "target" in k: 1486 v.update({"results_dir": args.results}) 1487 if data[k]["mode"] == "spdk": 1488 target_obj = SPDKTarget(k, data["general"], v) 1489 elif data[k]["mode"] == "kernel": 1490 target_obj = KernelTarget(k, data["general"], v) 1491 elif "initiator" in k: 1492 if data[k]["mode"] == "spdk": 1493 init_obj = SPDKInitiator(k, data["general"], v) 1494 elif data[k]["mode"] == "kernel": 1495 init_obj = KernelInitiator(k, data["general"], v) 1496 initiators.append(init_obj) 1497 elif "fio" in k: 1498 fio_workloads = itertools.product(data[k]["bs"], 1499 data[k]["qd"], 1500 data[k]["rw"]) 1501 1502 fio_run_time = data[k]["run_time"] 1503 fio_ramp_time = data[k]["ramp_time"] 1504 fio_rw_mix_read = data[k]["rwmixread"] 1505 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1506 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1507 1508 fio_rate_iops = 0 1509 if "rate_iops" in data[k]: 1510 fio_rate_iops = data[k]["rate_iops"] 1511 else: 1512 continue 1513 1514 try: 1515 os.mkdir(args.results) 1516 except FileExistsError: 1517 pass 1518 1519 for i in initiators: 1520 target_obj.initiator_info.append( 1521 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1522 ) 1523 1524 # TODO: This try block is definietly too large. Need to break this up into separate 1525 # logical blocks to reduce size. 1526 try: 1527 target_obj.tgt_start() 1528 1529 for i in initiators: 1530 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1531 if i.enable_adq: 1532 i.adq_configure_tc() 1533 1534 # Poor mans threading 1535 # Run FIO tests 1536 for block_size, io_depth, rw in fio_workloads: 1537 threads = [] 1538 configs = [] 1539 for i in initiators: 1540 if i.mode == "kernel": 1541 i.kernel_init_connect() 1542 1543 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1544 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1545 configs.append(cfg) 1546 1547 for i, cfg in zip(initiators, configs): 1548 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1549 threads.append(t) 1550 if target_obj.enable_sar: 1551 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1552 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1553 threads.append(t) 1554 1555 if target_obj.enable_pcm: 1556 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1557 1558 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1559 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1560 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1561 1562 threads.append(pcm_cpu_t) 1563 threads.append(pcm_mem_t) 1564 threads.append(pcm_pow_t) 1565 1566 if target_obj.enable_bandwidth: 1567 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1568 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1569 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1570 threads.append(t) 1571 1572 if target_obj.enable_dpdk_memory: 1573 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1574 threads.append(t) 1575 1576 if target_obj.enable_adq: 1577 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1578 threads.append(ethtool_thread) 1579 1580 for t in threads: 1581 t.start() 1582 for t in threads: 1583 t.join() 1584 1585 for i in initiators: 1586 if i.mode == "kernel": 1587 i.kernel_init_disconnect() 1588 i.copy_result_files(args.results) 1589 1590 target_obj.restore_governor() 1591 target_obj.restore_tuned() 1592 target_obj.restore_services() 1593 target_obj.restore_sysctl() 1594 if target_obj.enable_adq: 1595 target_obj.reload_driver("ice") 1596 for i in initiators: 1597 i.restore_governor() 1598 i.restore_tuned() 1599 i.restore_services() 1600 i.restore_sysctl() 1601 if i.enable_adq: 1602 i.reload_driver("ice") 1603 target_obj.parse_results(args.results, args.csv_filename) 1604 finally: 1605 for i in initiators: 1606 try: 1607 i.stop() 1608 except Exception as err: 1609 pass 1610 target_obj.stop() 1611