1#!/usr/bin/env python3 2 3from json.decoder import JSONDecodeError 4import os 5import re 6import sys 7import argparse 8import json 9import zipfile 10import threading 11import subprocess 12import itertools 13import configparser 14import time 15import uuid 16from collections import OrderedDict 17 18import paramiko 19import pandas as pd 20from common import * 21 22sys.path.append(os.path.dirname(__file__) + '/../../../python') 23 24import spdk.rpc as rpc # noqa 25import spdk.rpc.client as rpc_client # noqa 26 27 28class Server: 29 def __init__(self, name, general_config, server_config): 30 self.name = name 31 self.username = general_config["username"] 32 self.password = general_config["password"] 33 self.transport = general_config["transport"].lower() 34 self.nic_ips = server_config["nic_ips"] 35 self.mode = server_config["mode"] 36 37 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 38 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 39 self.irq_scripts_dir = server_config["irq_scripts_dir"] 40 41 self.local_nic_info = [] 42 self._nics_json_obj = {} 43 self.svc_restore_dict = {} 44 self.sysctl_restore_dict = {} 45 self.tuned_restore_dict = {} 46 self.governor_restore = "" 47 self.tuned_profile = "" 48 49 self.enable_adq = False 50 self.adq_priority = None 51 if "adq_enable" in server_config and server_config["adq_enable"]: 52 self.enable_adq = server_config["adq_enable"] 53 self.adq_priority = 1 54 55 if "tuned_profile" in server_config: 56 self.tuned_profile = server_config["tuned_profile"] 57 58 if not re.match("^[A-Za-z0-9]*$", name): 59 self.log_print("Please use a name which contains only letters or numbers") 60 sys.exit(1) 61 62 def log_print(self, msg): 63 print("[%s] %s" % (self.name, msg), flush=True) 64 65 @staticmethod 66 def get_uncommented_lines(lines): 67 return [line for line in lines if line and not line.startswith('#')] 68 69 def get_nic_name_by_ip(self, ip): 70 if not self._nics_json_obj: 71 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 72 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 73 for nic in self._nics_json_obj: 74 for addr in nic["addr_info"]: 75 if ip in addr["local"]: 76 return nic["ifname"] 77 78 def set_local_nic_info_helper(self): 79 pass 80 81 def set_local_nic_info(self, pci_info): 82 def extract_network_elements(json_obj): 83 nic_list = [] 84 if isinstance(json_obj, list): 85 for x in json_obj: 86 nic_list.extend(extract_network_elements(x)) 87 elif isinstance(json_obj, dict): 88 if "children" in json_obj: 89 nic_list.extend(extract_network_elements(json_obj["children"])) 90 if "class" in json_obj.keys() and "network" in json_obj["class"]: 91 nic_list.append(json_obj) 92 return nic_list 93 94 self.local_nic_info = extract_network_elements(pci_info) 95 96 # pylint: disable=R0201 97 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 98 return "" 99 100 def configure_system(self): 101 self.configure_services() 102 self.configure_sysctl() 103 self.configure_tuned() 104 self.configure_cpu_governor() 105 self.configure_irq_affinity() 106 107 def configure_adq(self): 108 if self.mode == "kernel": 109 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 110 return 111 self.adq_load_modules() 112 self.adq_configure_nic() 113 114 def adq_load_modules(self): 115 self.log_print("Modprobing ADQ-related Linux modules...") 116 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 117 for module in adq_module_deps: 118 try: 119 self.exec_cmd(["sudo", "modprobe", module]) 120 self.log_print("%s loaded!" % module) 121 except CalledProcessError as e: 122 self.log_print("ERROR: failed to load module %s" % module) 123 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 124 125 def adq_configure_tc(self): 126 self.log_print("Configuring ADQ Traffic classes and filters...") 127 128 if self.mode == "kernel": 129 self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 130 return 131 132 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 133 num_queues_tc1 = self.num_cores 134 port_param = "dst_port" if isinstance(self, Target) else "src_port" 135 port = "4420" 136 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 137 138 for nic_ip in self.nic_ips: 139 nic_name = self.get_nic_name_by_ip(nic_ip) 140 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 141 "root", "mqprio", "num_tc", "2", "map", "0", "1", 142 "queues", "%s@0" % num_queues_tc0, 143 "%s@%s" % (num_queues_tc1, num_queues_tc0), 144 "hw", "1", "mode", "channel"] 145 self.log_print(" ".join(tc_qdisc_map_cmd)) 146 self.exec_cmd(tc_qdisc_map_cmd) 147 148 time.sleep(5) 149 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 150 self.log_print(" ".join(tc_qdisc_ingress_cmd)) 151 self.exec_cmd(tc_qdisc_ingress_cmd) 152 153 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 154 "protocol", "ip", "ingress", "prio", "1", "flower", 155 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 156 "skip_sw", "hw_tc", "1"] 157 self.log_print(" ".join(tc_filter_cmd)) 158 self.exec_cmd(tc_filter_cmd) 159 160 # show tc configuration 161 self.log_print("Show tc configuration for %s NIC..." % nic_name) 162 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 163 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 164 self.log_print("%s" % tc_disk_out) 165 self.log_print("%s" % tc_filter_out) 166 167 # Ethtool coalesce settings must be applied after configuring traffic classes 168 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 169 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 170 171 self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name) 172 xps_cmd = ["sudo", xps_script_path, nic_name] 173 self.log_print(xps_cmd) 174 self.exec_cmd(xps_cmd) 175 176 def reload_driver(self, driver): 177 try: 178 self.exec_cmd(["sudo", "rmmod", driver]) 179 self.exec_cmd(["sudo", "modprobe", driver]) 180 except CalledProcessError as e: 181 self.log_print("ERROR: failed to reload %s module!" % driver) 182 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 183 184 def adq_configure_nic(self): 185 self.log_print("Configuring NIC port settings for ADQ testing...") 186 187 # Reload the driver first, to make sure any previous settings are re-set. 188 self.reload_driver("ice") 189 190 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 191 for nic in nic_names: 192 self.log_print(nic) 193 try: 194 self.exec_cmd(["sudo", "ethtool", "-K", nic, 195 "hw-tc-offload", "on"]) # Enable hardware TC offload 196 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 197 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 198 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 199 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 200 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 201 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 202 "channel-pkt-inspect-optimize", "on"]) 203 except CalledProcessError as e: 204 self.log_print("ERROR: failed to configure NIC port using ethtool!") 205 self.log_print("%s resulted in error: %s" % (e.cmd, e.output)) 206 self.log_print("Please update your NIC driver and firmware versions and try again.") 207 self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 208 self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 209 210 def configure_services(self): 211 self.log_print("Configuring active services...") 212 svc_config = configparser.ConfigParser(strict=False) 213 214 # Below list is valid only for RHEL / Fedora systems and might not 215 # contain valid names for other distributions. 216 svc_target_state = { 217 "firewalld": "inactive", 218 "irqbalance": "inactive", 219 "lldpad.service": "inactive", 220 "lldpad.socket": "inactive" 221 } 222 223 for service in svc_target_state: 224 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 225 out = "\n".join(["[%s]" % service, out]) 226 svc_config.read_string(out) 227 228 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 229 continue 230 231 service_state = svc_config[service]["ActiveState"] 232 self.log_print("Current state of %s service is %s" % (service, service_state)) 233 self.svc_restore_dict.update({service: service_state}) 234 if service_state != "inactive": 235 self.log_print("Disabling %s. It will be restored after the test has finished." % service) 236 self.exec_cmd(["sudo", "systemctl", "stop", service]) 237 238 def configure_sysctl(self): 239 self.log_print("Tuning sysctl settings...") 240 241 busy_read = 0 242 if self.enable_adq and self.mode == "spdk": 243 busy_read = 1 244 245 sysctl_opts = { 246 "net.core.busy_poll": 0, 247 "net.core.busy_read": busy_read, 248 "net.core.somaxconn": 4096, 249 "net.core.netdev_max_backlog": 8192, 250 "net.ipv4.tcp_max_syn_backlog": 16384, 251 "net.core.rmem_max": 268435456, 252 "net.core.wmem_max": 268435456, 253 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 254 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 255 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 256 "net.ipv4.route.flush": 1, 257 "vm.overcommit_memory": 1, 258 } 259 260 for opt, value in sysctl_opts.items(): 261 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 262 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 263 264 def configure_tuned(self): 265 if not self.tuned_profile: 266 self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 267 return 268 269 self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile) 270 service = "tuned" 271 tuned_config = configparser.ConfigParser(strict=False) 272 273 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 274 out = "\n".join(["[%s]" % service, out]) 275 tuned_config.read_string(out) 276 tuned_state = tuned_config[service]["ActiveState"] 277 self.svc_restore_dict.update({service: tuned_state}) 278 279 if tuned_state != "inactive": 280 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 281 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 282 283 self.tuned_restore_dict = { 284 "profile": profile, 285 "mode": profile_mode 286 } 287 288 self.exec_cmd(["sudo", "systemctl", "start", service]) 289 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 290 self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 291 292 def configure_cpu_governor(self): 293 self.log_print("Setting CPU governor to performance...") 294 295 # This assumes that there is the same CPU scaling governor on each CPU 296 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 297 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 298 299 def configure_irq_affinity(self): 300 self.log_print("Setting NIC irq affinity for NICs...") 301 302 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 303 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 304 for nic in nic_names: 305 irq_cmd = ["sudo", irq_script_path, nic] 306 self.log_print(irq_cmd) 307 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 308 309 def restore_services(self): 310 self.log_print("Restoring services...") 311 for service, state in self.svc_restore_dict.items(): 312 cmd = "stop" if state == "inactive" else "start" 313 self.exec_cmd(["sudo", "systemctl", cmd, service]) 314 315 def restore_sysctl(self): 316 self.log_print("Restoring sysctl settings...") 317 for opt, value in self.sysctl_restore_dict.items(): 318 self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 319 320 def restore_tuned(self): 321 self.log_print("Restoring tuned-adm settings...") 322 323 if not self.tuned_restore_dict: 324 return 325 326 if self.tuned_restore_dict["mode"] == "auto": 327 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 328 self.log_print("Reverted tuned-adm to auto_profile.") 329 else: 330 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 331 self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 332 333 def restore_governor(self): 334 self.log_print("Restoring CPU governor setting...") 335 if self.governor_restore: 336 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 337 self.log_print("Reverted CPU governor to %s." % self.governor_restore) 338 339 340class Target(Server): 341 def __init__(self, name, general_config, target_config): 342 super().__init__(name, general_config, target_config) 343 344 # Defaults 345 self.enable_sar = False 346 self.sar_delay = 0 347 self.sar_interval = 0 348 self.sar_count = 0 349 self.enable_pcm = False 350 self.pcm_dir = "" 351 self.pcm_delay = 0 352 self.pcm_interval = 0 353 self.pcm_count = 0 354 self.enable_bandwidth = 0 355 self.bandwidth_count = 0 356 self.enable_dpdk_memory = False 357 self.dpdk_wait_time = 0 358 self.enable_zcopy = False 359 self.scheduler_name = "static" 360 self.null_block = 0 361 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 362 self.subsystem_info_list = [] 363 self.initiator_info = [] 364 365 if "null_block_devices" in target_config: 366 self.null_block = target_config["null_block_devices"] 367 if "sar_settings" in target_config: 368 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 369 if "pcm_settings" in target_config: 370 self.enable_pcm = True 371 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 372 if "enable_bandwidth" in target_config: 373 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 374 if "enable_dpdk_memory" in target_config: 375 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 376 if "scheduler_settings" in target_config: 377 self.scheduler_name = target_config["scheduler_settings"] 378 if "zcopy_settings" in target_config: 379 self.enable_zcopy = target_config["zcopy_settings"] 380 if "results_dir" in target_config: 381 self.results_dir = target_config["results_dir"] 382 383 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 384 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 385 self.set_local_nic_info(self.set_local_nic_info_helper()) 386 387 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 388 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 389 390 self.configure_system() 391 if self.enable_adq: 392 self.configure_adq() 393 self.sys_config() 394 395 def set_local_nic_info_helper(self): 396 return json.loads(self.exec_cmd(["lshw", "-json"])) 397 398 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 399 stderr_opt = None 400 if stderr_redirect: 401 stderr_opt = subprocess.STDOUT 402 if change_dir: 403 old_cwd = os.getcwd() 404 os.chdir(change_dir) 405 self.log_print("Changing directory to %s" % change_dir) 406 407 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 408 409 if change_dir: 410 os.chdir(old_cwd) 411 self.log_print("Changing directory to %s" % old_cwd) 412 return out 413 414 def zip_spdk_sources(self, spdk_dir, dest_file): 415 self.log_print("Zipping SPDK source directory") 416 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 417 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 418 for file in files: 419 fh.write(os.path.relpath(os.path.join(root, file))) 420 fh.close() 421 self.log_print("Done zipping") 422 423 @staticmethod 424 def _chunks(input_list, chunks_no): 425 div, rem = divmod(len(input_list), chunks_no) 426 for i in range(chunks_no): 427 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 428 yield input_list[si:si + (div + 1 if i < rem else div)] 429 430 def spread_bdevs(self, req_disks): 431 # Spread available block devices indexes: 432 # - evenly across available initiator systems 433 # - evenly across available NIC interfaces for 434 # each initiator 435 # Not NUMA aware. 436 ip_bdev_map = [] 437 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 438 439 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 440 self.initiator_info[i]["bdev_range"] = init_chunk 441 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 442 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 443 for c in nic_chunk: 444 ip_bdev_map.append((ip, c)) 445 return ip_bdev_map 446 447 @staticmethod 448 def read_json_stats(file): 449 with open(file, "r") as json_data: 450 data = json.load(json_data) 451 job_pos = 0 # job_post = 0 because using aggregated results 452 453 # Check if latency is in nano or microseconds to choose correct dict key 454 def get_lat_unit(key_prefix, dict_section): 455 # key prefix - lat, clat or slat. 456 # dict section - portion of json containing latency bucket in question 457 # Return dict key to access the bucket and unit as string 458 for k, _ in dict_section.items(): 459 if k.startswith(key_prefix): 460 return k, k.split("_")[1] 461 462 def get_clat_percentiles(clat_dict_leaf): 463 if "percentile" in clat_dict_leaf: 464 p99_lat = float(clat_dict_leaf["percentile"]["99.000000"]) 465 p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"]) 466 p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"]) 467 p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"]) 468 469 return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat] 470 else: 471 # Latest fio versions do not provide "percentile" results if no 472 # measurements were done, so just return zeroes 473 return [0, 0, 0, 0] 474 475 read_iops = float(data["jobs"][job_pos]["read"]["iops"]) 476 read_bw = float(data["jobs"][job_pos]["read"]["bw"]) 477 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"]) 478 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"]) 479 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"]) 480 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"]) 481 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"]) 482 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles( 483 data["jobs"][job_pos]["read"][clat_key]) 484 485 if "ns" in lat_unit: 486 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]] 487 if "ns" in clat_unit: 488 read_p99_lat = read_p99_lat / 1000 489 read_p99_9_lat = read_p99_9_lat / 1000 490 read_p99_99_lat = read_p99_99_lat / 1000 491 read_p99_999_lat = read_p99_999_lat / 1000 492 493 write_iops = float(data["jobs"][job_pos]["write"]["iops"]) 494 write_bw = float(data["jobs"][job_pos]["write"]["bw"]) 495 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"]) 496 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"]) 497 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"]) 498 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"]) 499 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"]) 500 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles( 501 data["jobs"][job_pos]["write"][clat_key]) 502 503 if "ns" in lat_unit: 504 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]] 505 if "ns" in clat_unit: 506 write_p99_lat = write_p99_lat / 1000 507 write_p99_9_lat = write_p99_9_lat / 1000 508 write_p99_99_lat = write_p99_99_lat / 1000 509 write_p99_999_lat = write_p99_999_lat / 1000 510 511 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat, 512 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat, 513 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat, 514 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat] 515 516 def parse_results(self, results_dir, csv_file): 517 files = os.listdir(results_dir) 518 fio_files = filter(lambda x: ".fio" in x, files) 519 json_files = [x for x in files if ".json" in x] 520 521 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us", 522 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us", 523 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us", 524 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"] 525 526 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us", 527 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"] 528 529 header_line = ",".join(["Name", *headers]) 530 aggr_header_line = ",".join(["Name", *aggr_headers]) 531 532 # Create empty results file 533 with open(os.path.join(results_dir, csv_file), "w") as fh: 534 fh.write(aggr_header_line + "\n") 535 rows = set() 536 537 for fio_config in fio_files: 538 self.log_print("Getting FIO stats for %s" % fio_config) 539 job_name, _ = os.path.splitext(fio_config) 540 541 # Look in the filename for rwmixread value. Function arguments do 542 # not have that information. 543 # TODO: Improve this function by directly using workload params instead 544 # of regexing through filenames. 545 if "read" in job_name: 546 rw_mixread = 1 547 elif "write" in job_name: 548 rw_mixread = 0 549 else: 550 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100 551 552 # If "_CPU" exists in name - ignore it 553 # Initiators for the same job could have different num_cores parameter 554 job_name = re.sub(r"_\d+CPU", "", job_name) 555 job_result_files = [x for x in json_files if x.startswith(job_name)] 556 self.log_print("Matching result files for current fio config:") 557 for j in job_result_files: 558 self.log_print("\t %s" % j) 559 560 # There may have been more than 1 initiator used in test, need to check that 561 # Result files are created so that string after last "_" separator is server name 562 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files]) 563 inits_avg_results = [] 564 for i in inits_names: 565 self.log_print("\tGetting stats for initiator %s" % i) 566 # There may have been more than 1 test run for this job, calculate average results for initiator 567 i_results = [x for x in job_result_files if i in x] 568 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv")) 569 570 separate_stats = [] 571 for r in i_results: 572 try: 573 stats = self.read_json_stats(os.path.join(results_dir, r)) 574 separate_stats.append(stats) 575 self.log_print(stats) 576 except JSONDecodeError: 577 self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r) 578 579 init_results = [sum(x) for x in zip(*separate_stats)] 580 init_results = [x / len(separate_stats) for x in init_results] 581 inits_avg_results.append(init_results) 582 583 self.log_print("\tAverage results for initiator %s" % i) 584 self.log_print(init_results) 585 with open(os.path.join(results_dir, i_results_filename), "w") as fh: 586 fh.write(header_line + "\n") 587 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n") 588 589 # Sum results of all initiators running this FIO job. 590 # Latency results are an average of latencies from accros all initiators. 591 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)] 592 inits_avg_results = OrderedDict(zip(headers, inits_avg_results)) 593 for key in inits_avg_results: 594 if "lat" in key: 595 inits_avg_results[key] /= len(inits_names) 596 597 # Aggregate separate read/write values into common labels 598 # Take rw_mixread into consideration for mixed read/write workloads. 599 aggregate_results = OrderedDict() 600 for h in aggr_headers: 601 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key] 602 if "lat" in h: 603 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat 604 else: 605 _ = read_stat + write_stat 606 aggregate_results[h] = "{0:.3f}".format(_) 607 608 rows.add(",".join([job_name, *aggregate_results.values()])) 609 610 # Save results to file 611 for row in rows: 612 with open(os.path.join(results_dir, csv_file), "a") as fh: 613 fh.write(row + "\n") 614 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file)) 615 616 def measure_sar(self, results_dir, sar_file_prefix): 617 cpu_number = os.cpu_count() 618 sar_idle_sum = 0 619 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 620 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 621 622 self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 623 time.sleep(self.sar_delay) 624 self.log_print("Starting SAR measurements") 625 626 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 627 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 628 for line in out.split("\n"): 629 if "Average" in line: 630 if "CPU" in line: 631 self.log_print("Summary CPU utilization from SAR:") 632 self.log_print(line) 633 elif "all" in line: 634 self.log_print(line) 635 else: 636 sar_idle_sum += float(line.split()[7]) 637 fh.write(out) 638 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 639 640 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 641 f.write("%0.2f" % sar_cpu_usage) 642 643 def ethtool_after_fio_ramp(self, fio_ramp_time): 644 time.sleep(fio_ramp_time//2) 645 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 646 for nic in nic_names: 647 self.log_print(nic) 648 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 649 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 650 651 def measure_pcm_memory(self, results_dir, pcm_file_name): 652 time.sleep(self.pcm_delay) 653 cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 654 pcm_memory = subprocess.Popen(cmd) 655 time.sleep(self.pcm_count) 656 pcm_memory.terminate() 657 658 def measure_pcm(self, results_dir, pcm_file_name): 659 time.sleep(self.pcm_delay) 660 cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)] 661 subprocess.run(cmd) 662 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 663 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 664 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 665 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 666 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 667 668 def measure_pcm_power(self, results_dir, pcm_power_file_name): 669 time.sleep(self.pcm_delay) 670 out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 671 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 672 fh.write(out) 673 674 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 675 self.log_print("INFO: starting network bandwidth measure") 676 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 677 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 678 679 def measure_dpdk_memory(self, results_dir): 680 self.log_print("INFO: waiting to generate DPDK memory usage") 681 time.sleep(self.dpdk_wait_time) 682 self.log_print("INFO: generating DPDK memory usage") 683 rpc.env.env_dpdk_get_mem_stats 684 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 685 686 def sys_config(self): 687 self.log_print("====Kernel release:====") 688 self.log_print(os.uname().release) 689 self.log_print("====Kernel command line:====") 690 with open('/proc/cmdline') as f: 691 cmdline = f.readlines() 692 self.log_print('\n'.join(self.get_uncommented_lines(cmdline))) 693 self.log_print("====sysctl conf:====") 694 with open('/etc/sysctl.conf') as f: 695 sysctl = f.readlines() 696 self.log_print('\n'.join(self.get_uncommented_lines(sysctl))) 697 self.log_print("====Cpu power info:====") 698 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 699 self.log_print("====zcopy settings:====") 700 self.log_print("zcopy enabled: %s" % (self.enable_zcopy)) 701 self.log_print("====Scheduler settings:====") 702 self.log_print("SPDK scheduler: %s" % (self.scheduler_name)) 703 704 705class Initiator(Server): 706 def __init__(self, name, general_config, initiator_config): 707 super().__init__(name, general_config, initiator_config) 708 709 # Required fields 710 self.ip = initiator_config["ip"] 711 self.target_nic_ips = initiator_config["target_nic_ips"] 712 713 # Defaults 714 self.cpus_allowed = None 715 self.cpus_allowed_policy = "shared" 716 self.spdk_dir = "/tmp/spdk" 717 self.fio_bin = "/usr/src/fio/fio" 718 self.nvmecli_bin = "nvme" 719 self.cpu_frequency = None 720 self.subsystem_info_list = [] 721 722 if "spdk_dir" in initiator_config: 723 self.spdk_dir = initiator_config["spdk_dir"] 724 if "fio_bin" in initiator_config: 725 self.fio_bin = initiator_config["fio_bin"] 726 if "nvmecli_bin" in initiator_config: 727 self.nvmecli_bin = initiator_config["nvmecli_bin"] 728 if "cpus_allowed" in initiator_config: 729 self.cpus_allowed = initiator_config["cpus_allowed"] 730 if "cpus_allowed_policy" in initiator_config: 731 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 732 if "cpu_frequency" in initiator_config: 733 self.cpu_frequency = initiator_config["cpu_frequency"] 734 if os.getenv('SPDK_WORKSPACE'): 735 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 736 737 self.ssh_connection = paramiko.SSHClient() 738 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 739 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 740 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 741 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 742 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 743 744 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 745 self.copy_spdk("/tmp/spdk.zip") 746 self.set_local_nic_info(self.set_local_nic_info_helper()) 747 self.set_cpu_frequency() 748 self.configure_system() 749 if self.enable_adq: 750 self.configure_adq() 751 self.sys_config() 752 753 def set_local_nic_info_helper(self): 754 return json.loads(self.exec_cmd(["lshw", "-json"])) 755 756 def stop(self): 757 self.ssh_connection.close() 758 759 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 760 if change_dir: 761 cmd = ["cd", change_dir, ";", *cmd] 762 763 # In case one of the command elements contains whitespace and is not 764 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 765 # when sending to remote system. 766 for i, c in enumerate(cmd): 767 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 768 cmd[i] = '"%s"' % c 769 cmd = " ".join(cmd) 770 771 # Redirect stderr to stdout thanks using get_pty option if needed 772 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 773 out = stdout.read().decode(encoding="utf-8") 774 775 # Check the return code 776 rc = stdout.channel.recv_exit_status() 777 if rc: 778 raise CalledProcessError(int(rc), cmd, out) 779 780 return out 781 782 def put_file(self, local, remote_dest): 783 ftp = self.ssh_connection.open_sftp() 784 ftp.put(local, remote_dest) 785 ftp.close() 786 787 def get_file(self, remote, local_dest): 788 ftp = self.ssh_connection.open_sftp() 789 ftp.get(remote, local_dest) 790 ftp.close() 791 792 def copy_spdk(self, local_spdk_zip): 793 self.log_print("Copying SPDK sources to initiator %s" % self.name) 794 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 795 self.log_print("Copied sources zip from target") 796 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 797 self.log_print("Sources unpacked") 798 799 def copy_result_files(self, dest_dir): 800 self.log_print("Copying results") 801 802 if not os.path.exists(dest_dir): 803 os.mkdir(dest_dir) 804 805 # Get list of result files from initiator and copy them back to target 806 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 807 808 for file in file_list: 809 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 810 os.path.join(dest_dir, file)) 811 self.log_print("Done copying results") 812 813 def discover_subsystems(self, address_list, subsys_no): 814 num_nvmes = range(0, subsys_no) 815 nvme_discover_output = "" 816 for ip, subsys_no in itertools.product(address_list, num_nvmes): 817 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no)) 818 nvme_discover_cmd = ["sudo", 819 "%s" % self.nvmecli_bin, 820 "discover", "-t", "%s" % self.transport, 821 "-s", "%s" % (4420 + subsys_no), 822 "-a", "%s" % ip] 823 824 try: 825 stdout = self.exec_cmd(nvme_discover_cmd) 826 if stdout: 827 nvme_discover_output = nvme_discover_output + stdout 828 except CalledProcessError: 829 # Do nothing. In case of discovering remote subsystems of kernel target 830 # we expect "nvme discover" to fail a bunch of times because we basically 831 # scan ports. 832 pass 833 834 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number 835 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id 836 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address 837 nvme_discover_output) # from nvme discovery output 838 subsystems = filter(lambda x: x[-1] in address_list, subsystems) 839 subsystems = filter(lambda x: "discovery" not in x[1], subsystems) 840 subsystems = list(set(subsystems)) 841 subsystems.sort(key=lambda x: x[1]) 842 self.log_print("Found matching subsystems on target side:") 843 for s in subsystems: 844 self.log_print(s) 845 self.subsystem_info_list = subsystems 846 847 def gen_fio_filename_conf(self, *args, **kwargs): 848 # Logic implemented in SPDKInitiator and KernelInitiator classes 849 pass 850 851 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0): 852 fio_conf_template = """ 853[global] 854ioengine={ioengine} 855{spdk_conf} 856thread=1 857group_reporting=1 858direct=1 859percentile_list=50:90:99:99.5:99.9:99.99:99.999 860 861norandommap=1 862rw={rw} 863rwmixread={rwmixread} 864bs={block_size} 865time_based=1 866ramp_time={ramp_time} 867runtime={run_time} 868rate_iops={rate_iops} 869""" 870 if "spdk" in self.mode: 871 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 872 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 873 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 874 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 875 else: 876 ioengine = self.ioengine 877 spdk_conf = "" 878 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 879 "|", "awk", "'{print $1}'"]) 880 subsystems = [x for x in out.split("\n") if "nvme" in x] 881 882 if self.cpus_allowed is not None: 883 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 884 cpus_num = 0 885 cpus = self.cpus_allowed.split(",") 886 for cpu in cpus: 887 if "-" in cpu: 888 a, b = cpu.split("-") 889 a = int(a) 890 b = int(b) 891 cpus_num += len(range(a, b)) 892 else: 893 cpus_num += 1 894 self.num_cores = cpus_num 895 threads = range(0, self.num_cores) 896 elif hasattr(self, 'num_cores'): 897 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores) 898 threads = range(0, int(self.num_cores)) 899 else: 900 self.num_cores = len(subsystems) 901 threads = range(0, len(subsystems)) 902 903 if "spdk" in self.mode: 904 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs) 905 else: 906 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs) 907 908 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 909 rw=rw, rwmixread=rwmixread, block_size=block_size, 910 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 911 912 # TODO: hipri disabled for now, as it causes fio errors: 913 # io_u error on file /dev/nvme2n1: Operation not supported 914 # See comment in KernelInitiator class, kernel_init_connect() function 915 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 916 fio_config = fio_config + """ 917fixedbufs=1 918registerfiles=1 919#hipri=1 920""" 921 if num_jobs: 922 fio_config = fio_config + "numjobs=%s \n" % num_jobs 923 if self.cpus_allowed is not None: 924 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 925 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 926 fio_config = fio_config + filename_section 927 928 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 929 if hasattr(self, "num_cores"): 930 fio_config_filename += "_%sCPU" % self.num_cores 931 fio_config_filename += ".fio" 932 933 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 934 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 935 self.log_print("Created FIO Config:") 936 self.log_print(fio_config) 937 938 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 939 940 def set_cpu_frequency(self): 941 if self.cpu_frequency is not None: 942 try: 943 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 944 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 945 self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 946 except Exception: 947 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 948 sys.exit() 949 else: 950 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.") 951 952 def run_fio(self, fio_config_file, run_num=None): 953 job_name, _ = os.path.splitext(fio_config_file) 954 self.log_print("Starting FIO run for job: %s" % job_name) 955 self.log_print("Using FIO: %s" % self.fio_bin) 956 957 if run_num: 958 for i in range(1, run_num + 1): 959 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 960 try: 961 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 962 "--output=%s" % output_filename, "--eta=never"], True) 963 self.log_print(output) 964 except subprocess.CalledProcessError as e: 965 self.log_print("ERROR: Fio process failed!") 966 self.log_print(e.stdout) 967 else: 968 output_filename = job_name + "_" + self.name + ".json" 969 output = self.exec_cmd(["sudo", self.fio_bin, 970 fio_config_file, "--output-format=json", 971 "--output" % output_filename], True) 972 self.log_print(output) 973 self.log_print("FIO run finished. Results in: %s" % output_filename) 974 975 def sys_config(self): 976 self.log_print("====Kernel release:====") 977 self.log_print(self.exec_cmd(["uname", "-r"])) 978 self.log_print("====Kernel command line:====") 979 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 980 self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 981 self.log_print("====sysctl conf:====") 982 sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"]) 983 self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 984 self.log_print("====Cpu power info:====") 985 self.log_print(self.exec_cmd(["cpupower", "frequency-info"])) 986 987 988class KernelTarget(Target): 989 def __init__(self, name, general_config, target_config): 990 super().__init__(name, general_config, target_config) 991 # Defaults 992 self.nvmet_bin = "nvmetcli" 993 994 if "nvmet_bin" in target_config: 995 self.nvmet_bin = target_config["nvmet_bin"] 996 997 def stop(self): 998 nvmet_command(self.nvmet_bin, "clear") 999 1000 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1001 1002 nvmet_cfg = { 1003 "ports": [], 1004 "hosts": [], 1005 "subsystems": [], 1006 } 1007 1008 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1009 port = str(4420 + bdev_num) 1010 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1011 serial = "SPDK00%s" % bdev_num 1012 bdev_name = nvme_list[bdev_num] 1013 1014 nvmet_cfg["subsystems"].append({ 1015 "allowed_hosts": [], 1016 "attr": { 1017 "allow_any_host": "1", 1018 "serial": serial, 1019 "version": "1.3" 1020 }, 1021 "namespaces": [ 1022 { 1023 "device": { 1024 "path": bdev_name, 1025 "uuid": "%s" % uuid.uuid4() 1026 }, 1027 "enable": 1, 1028 "nsid": port 1029 } 1030 ], 1031 "nqn": nqn 1032 }) 1033 1034 nvmet_cfg["ports"].append({ 1035 "addr": { 1036 "adrfam": "ipv4", 1037 "traddr": ip, 1038 "trsvcid": port, 1039 "trtype": self.transport 1040 }, 1041 "portid": bdev_num, 1042 "referrals": [], 1043 "subsystems": [nqn] 1044 }) 1045 1046 self.subsystem_info_list.append([port, nqn, ip]) 1047 self.subsys_no = len(self.subsystem_info_list) 1048 1049 with open("kernel.conf", "w") as fh: 1050 fh.write(json.dumps(nvmet_cfg, indent=2)) 1051 1052 def tgt_start(self): 1053 self.log_print("Configuring kernel NVMeOF Target") 1054 1055 if self.null_block: 1056 print("Configuring with null block device.") 1057 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1058 else: 1059 print("Configuring with NVMe drives.") 1060 nvme_list = get_nvme_devices() 1061 1062 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1063 self.subsys_no = len(nvme_list) 1064 1065 nvmet_command(self.nvmet_bin, "clear") 1066 nvmet_command(self.nvmet_bin, "restore kernel.conf") 1067 1068 if self.enable_adq: 1069 self.adq_configure_tc() 1070 1071 self.log_print("Done configuring kernel NVMeOF Target") 1072 1073 1074class SPDKTarget(Target): 1075 def __init__(self, name, general_config, target_config): 1076 super().__init__(name, general_config, target_config) 1077 1078 # Required fields 1079 self.core_mask = target_config["core_mask"] 1080 self.num_cores = self.get_num_cores(self.core_mask) 1081 1082 # Defaults 1083 self.dif_insert_strip = False 1084 self.null_block_dif_type = 0 1085 self.num_shared_buffers = 4096 1086 self.max_queue_depth = 128 1087 self.bpf_proc = None 1088 self.bpf_scripts = [] 1089 self.enable_dsa = False 1090 self.scheduler_core_limit = None 1091 1092 if "num_shared_buffers" in target_config: 1093 self.num_shared_buffers = target_config["num_shared_buffers"] 1094 if "max_queue_depth" in target_config: 1095 self.max_queue_depth = target_config["max_queue_depth"] 1096 if "null_block_dif_type" in target_config: 1097 self.null_block_dif_type = target_config["null_block_dif_type"] 1098 if "dif_insert_strip" in target_config: 1099 self.dif_insert_strip = target_config["dif_insert_strip"] 1100 if "bpf_scripts" in target_config: 1101 self.bpf_scripts = target_config["bpf_scripts"] 1102 if "dsa_settings" in target_config: 1103 self.enable_dsa = target_config["dsa_settings"] 1104 if "scheduler_core_limit" in target_config: 1105 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1106 1107 self.log_print("====DSA settings:====") 1108 self.log_print("DSA enabled: %s" % (self.enable_dsa)) 1109 1110 @staticmethod 1111 def get_num_cores(core_mask): 1112 if "0x" in core_mask: 1113 return bin(int(core_mask, 16)).count("1") 1114 else: 1115 num_cores = 0 1116 core_mask = core_mask.replace("[", "") 1117 core_mask = core_mask.replace("]", "") 1118 for i in core_mask.split(","): 1119 if "-" in i: 1120 x, y = i.split("-") 1121 num_cores += len(range(int(x), int(y))) + 1 1122 else: 1123 num_cores += 1 1124 return num_cores 1125 1126 def spdk_tgt_configure(self): 1127 self.log_print("Configuring SPDK NVMeOF target via RPC") 1128 1129 if self.enable_adq: 1130 self.adq_configure_tc() 1131 1132 # Create transport layer 1133 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1134 num_shared_buffers=self.num_shared_buffers, 1135 max_queue_depth=self.max_queue_depth, 1136 dif_insert_or_strip=self.dif_insert_strip, 1137 sock_priority=self.adq_priority) 1138 self.log_print("SPDK NVMeOF transport layer:") 1139 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1140 1141 if self.null_block: 1142 self.spdk_tgt_add_nullblock(self.null_block) 1143 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1144 else: 1145 self.spdk_tgt_add_nvme_conf() 1146 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1147 1148 self.log_print("Done configuring SPDK NVMeOF Target") 1149 1150 def spdk_tgt_add_nullblock(self, null_block_count): 1151 md_size = 0 1152 block_size = 4096 1153 if self.null_block_dif_type != 0: 1154 md_size = 128 1155 1156 self.log_print("Adding null block bdevices to config via RPC") 1157 for i in range(null_block_count): 1158 self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type) 1159 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1160 dif_type=self.null_block_dif_type, md_size=md_size) 1161 self.log_print("SPDK Bdevs configuration:") 1162 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1163 1164 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1165 self.log_print("Adding NVMe bdevs to config via RPC") 1166 1167 bdfs = get_nvme_devices_bdf() 1168 bdfs = [b.replace(":", ".") for b in bdfs] 1169 1170 if req_num_disks: 1171 if req_num_disks > len(bdfs): 1172 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1173 sys.exit(1) 1174 else: 1175 bdfs = bdfs[0:req_num_disks] 1176 1177 for i, bdf in enumerate(bdfs): 1178 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1179 1180 self.log_print("SPDK Bdevs configuration:") 1181 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1182 1183 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1184 self.log_print("Adding subsystems to config") 1185 if not req_num_disks: 1186 req_num_disks = get_nvme_devices_count() 1187 1188 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1189 port = str(4420 + bdev_num) 1190 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1191 serial = "SPDK00%s" % bdev_num 1192 bdev_name = "Nvme%sn1" % bdev_num 1193 1194 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1195 allow_any_host=True, max_namespaces=8) 1196 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1197 for nqn_name in [nqn, "discovery"]: 1198 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1199 nqn=nqn_name, 1200 trtype=self.transport, 1201 traddr=ip, 1202 trsvcid=port, 1203 adrfam="ipv4") 1204 self.subsystem_info_list.append([port, nqn, ip]) 1205 self.subsys_no = len(self.subsystem_info_list) 1206 1207 self.log_print("SPDK NVMeOF subsystem configuration:") 1208 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1209 1210 def bpf_start(self): 1211 self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1212 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1213 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1214 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1215 1216 with open(self.pid, "r") as fh: 1217 nvmf_pid = str(fh.readline()) 1218 1219 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1220 self.log_print(cmd) 1221 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1222 1223 def tgt_start(self): 1224 if self.null_block: 1225 self.subsys_no = 1 1226 else: 1227 self.subsys_no = get_nvme_devices_count() 1228 self.log_print("Starting SPDK NVMeOF Target process") 1229 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1230 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1231 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1232 1233 with open(self.pid, "w") as fh: 1234 fh.write(str(proc.pid)) 1235 self.nvmf_proc = proc 1236 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid) 1237 self.log_print("Waiting for spdk to initialize...") 1238 while True: 1239 if os.path.exists("/var/tmp/spdk.sock"): 1240 break 1241 time.sleep(1) 1242 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1243 1244 if self.enable_zcopy: 1245 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1246 enable_zerocopy_send_server=True) 1247 self.log_print("Target socket options:") 1248 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1249 1250 if self.enable_adq: 1251 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1252 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1253 nvme_adminq_poll_period_us=100000, retry_count=4) 1254 1255 if self.enable_dsa: 1256 rpc.dsa.dsa_scan_accel_engine(self.client, config_kernel_mode=None) 1257 self.log_print("Target DSA accel engine enabled") 1258 1259 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1260 rpc.framework_start_init(self.client) 1261 1262 if self.bpf_scripts: 1263 self.bpf_start() 1264 1265 self.spdk_tgt_configure() 1266 1267 def stop(self): 1268 if self.bpf_proc: 1269 self.log_print("Stopping BPF Trace script") 1270 self.bpf_proc.terminate() 1271 self.bpf_proc.wait() 1272 1273 if hasattr(self, "nvmf_proc"): 1274 try: 1275 self.nvmf_proc.terminate() 1276 self.nvmf_proc.wait() 1277 except Exception as e: 1278 self.log_print(e) 1279 self.nvmf_proc.kill() 1280 self.nvmf_proc.communicate() 1281 1282 1283class KernelInitiator(Initiator): 1284 def __init__(self, name, general_config, initiator_config): 1285 super().__init__(name, general_config, initiator_config) 1286 1287 # Defaults 1288 self.extra_params = "" 1289 self.ioengine = "libaio" 1290 1291 if "extra_params" in initiator_config: 1292 self.extra_params = initiator_config["extra_params"] 1293 1294 if "kernel_engine" in initiator_config: 1295 self.ioengine = initiator_config["kernel_engine"] 1296 if "io_uring" in self.ioengine: 1297 self.extra_params = "--nr-poll-queues=8" 1298 1299 def get_connected_nvme_list(self): 1300 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1301 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1302 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1303 return nvme_list 1304 1305 def kernel_init_connect(self): 1306 self.log_print("Below connection attempts may result in error messages, this is expected!") 1307 for subsystem in self.subsystem_info_list: 1308 self.log_print("Trying to connect %s %s %s" % subsystem) 1309 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1310 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1311 time.sleep(2) 1312 1313 if "io_uring" in self.ioengine: 1314 self.log_print("Setting block layer settings for io_uring.") 1315 1316 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1317 # apparently it's not possible for connected subsystems. 1318 # Results in "error: Invalid argument" 1319 block_sysfs_settings = { 1320 "iostats": "0", 1321 "rq_affinity": "0", 1322 "nomerges": "2" 1323 } 1324 1325 for disk in self.get_connected_nvme_list(): 1326 sysfs = os.path.join("/sys/block", disk, "queue") 1327 for k, v in block_sysfs_settings.items(): 1328 sysfs_opt_path = os.path.join(sysfs, k) 1329 try: 1330 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1331 except subprocess.CalledProcessError as e: 1332 self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1333 finally: 1334 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1335 self.log_print("%s=%s" % (sysfs_opt_path, _)) 1336 1337 def kernel_init_disconnect(self): 1338 for subsystem in self.subsystem_info_list: 1339 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1340 time.sleep(1) 1341 1342 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1): 1343 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1344 1345 filename_section = "" 1346 nvme_per_split = int(len(nvme_list) / len(threads)) 1347 remainder = len(nvme_list) % len(threads) 1348 iterator = iter(nvme_list) 1349 result = [] 1350 for i in range(len(threads)): 1351 result.append([]) 1352 for _ in range(nvme_per_split): 1353 result[i].append(next(iterator)) 1354 if remainder: 1355 result[i].append(next(iterator)) 1356 remainder -= 1 1357 for i, r in enumerate(result): 1358 header = "[filename%s]" % i 1359 disks = "\n".join(["filename=%s" % x for x in r]) 1360 job_section_qd = round((io_depth * len(r)) / num_jobs) 1361 if job_section_qd == 0: 1362 job_section_qd = 1 1363 iodepth = "iodepth=%s" % job_section_qd 1364 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1365 1366 return filename_section 1367 1368 1369class SPDKInitiator(Initiator): 1370 def __init__(self, name, general_config, initiator_config): 1371 super().__init__(name, general_config, initiator_config) 1372 1373 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1374 self.install_spdk() 1375 1376 # Required fields 1377 self.num_cores = initiator_config["num_cores"] 1378 1379 # Optional fields 1380 self.enable_data_digest = False 1381 if "enable_data_digest" in initiator_config: 1382 self.enable_data_digest = initiator_config["enable_data_digest"] 1383 1384 def install_spdk(self): 1385 self.log_print("Using fio binary %s" % self.fio_bin) 1386 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1387 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1388 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1389 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1390 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1391 1392 self.log_print("SPDK built") 1393 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1394 1395 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1396 bdev_cfg_section = { 1397 "subsystems": [ 1398 { 1399 "subsystem": "bdev", 1400 "config": [] 1401 } 1402 ] 1403 } 1404 1405 for i, subsys in enumerate(remote_subsystem_list): 1406 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1407 nvme_ctrl = { 1408 "method": "bdev_nvme_attach_controller", 1409 "params": { 1410 "name": "Nvme{}".format(i), 1411 "trtype": self.transport, 1412 "traddr": sub_addr, 1413 "trsvcid": sub_port, 1414 "subnqn": sub_nqn, 1415 "adrfam": "IPv4" 1416 } 1417 } 1418 1419 if self.enable_adq: 1420 nvme_ctrl["params"].update({"priority": "1"}) 1421 1422 if self.enable_data_digest: 1423 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1424 1425 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1426 1427 return json.dumps(bdev_cfg_section, indent=2) 1428 1429 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1): 1430 filename_section = "" 1431 if len(threads) >= len(subsystems): 1432 threads = range(0, len(subsystems)) 1433 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1434 nvme_per_split = int(len(subsystems) / len(threads)) 1435 remainder = len(subsystems) % len(threads) 1436 iterator = iter(filenames) 1437 result = [] 1438 for i in range(len(threads)): 1439 result.append([]) 1440 for _ in range(nvme_per_split): 1441 result[i].append(next(iterator)) 1442 if remainder: 1443 result[i].append(next(iterator)) 1444 remainder -= 1 1445 for i, r in enumerate(result): 1446 header = "[filename%s]" % i 1447 disks = "\n".join(["filename=%s" % x for x in r]) 1448 job_section_qd = round((io_depth * len(r)) / num_jobs) 1449 if job_section_qd == 0: 1450 job_section_qd = 1 1451 iodepth = "iodepth=%s" % job_section_qd 1452 filename_section = "\n".join([filename_section, header, disks, iodepth]) 1453 1454 return filename_section 1455 1456 1457if __name__ == "__main__": 1458 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1459 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1460 1461 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1462 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1463 help='Configuration file.') 1464 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1465 help='Results directory.') 1466 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1467 help='CSV results filename.') 1468 1469 args = parser.parse_args() 1470 1471 print("Using config file: %s" % args.config) 1472 with open(args.config, "r") as config: 1473 data = json.load(config) 1474 1475 initiators = [] 1476 fio_cases = [] 1477 1478 general_config = data["general"] 1479 target_config = data["target"] 1480 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1481 1482 for k, v in data.items(): 1483 if "target" in k: 1484 v.update({"results_dir": args.results}) 1485 if data[k]["mode"] == "spdk": 1486 target_obj = SPDKTarget(k, data["general"], v) 1487 elif data[k]["mode"] == "kernel": 1488 target_obj = KernelTarget(k, data["general"], v) 1489 elif "initiator" in k: 1490 if data[k]["mode"] == "spdk": 1491 init_obj = SPDKInitiator(k, data["general"], v) 1492 elif data[k]["mode"] == "kernel": 1493 init_obj = KernelInitiator(k, data["general"], v) 1494 initiators.append(init_obj) 1495 elif "fio" in k: 1496 fio_workloads = itertools.product(data[k]["bs"], 1497 data[k]["qd"], 1498 data[k]["rw"]) 1499 1500 fio_run_time = data[k]["run_time"] 1501 fio_ramp_time = data[k]["ramp_time"] 1502 fio_rw_mix_read = data[k]["rwmixread"] 1503 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1504 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1505 1506 fio_rate_iops = 0 1507 if "rate_iops" in data[k]: 1508 fio_rate_iops = data[k]["rate_iops"] 1509 else: 1510 continue 1511 1512 try: 1513 os.mkdir(args.results) 1514 except FileExistsError: 1515 pass 1516 1517 for i in initiators: 1518 target_obj.initiator_info.append( 1519 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1520 ) 1521 1522 # TODO: This try block is definietly too large. Need to break this up into separate 1523 # logical blocks to reduce size. 1524 try: 1525 target_obj.tgt_start() 1526 1527 for i in initiators: 1528 i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no) 1529 if i.enable_adq: 1530 i.adq_configure_tc() 1531 1532 # Poor mans threading 1533 # Run FIO tests 1534 for block_size, io_depth, rw in fio_workloads: 1535 threads = [] 1536 configs = [] 1537 for i in initiators: 1538 if i.mode == "kernel": 1539 i.kernel_init_connect() 1540 1541 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1542 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops) 1543 configs.append(cfg) 1544 1545 for i, cfg in zip(initiators, configs): 1546 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1547 threads.append(t) 1548 if target_obj.enable_sar: 1549 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1550 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1551 threads.append(t) 1552 1553 if target_obj.enable_pcm: 1554 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1555 1556 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1557 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1558 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1559 1560 threads.append(pcm_cpu_t) 1561 threads.append(pcm_mem_t) 1562 threads.append(pcm_pow_t) 1563 1564 if target_obj.enable_bandwidth: 1565 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1566 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1567 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1568 threads.append(t) 1569 1570 if target_obj.enable_dpdk_memory: 1571 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1572 threads.append(t) 1573 1574 if target_obj.enable_adq: 1575 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1576 threads.append(ethtool_thread) 1577 1578 for t in threads: 1579 t.start() 1580 for t in threads: 1581 t.join() 1582 1583 for i in initiators: 1584 if i.mode == "kernel": 1585 i.kernel_init_disconnect() 1586 i.copy_result_files(args.results) 1587 1588 target_obj.restore_governor() 1589 target_obj.restore_tuned() 1590 target_obj.restore_services() 1591 target_obj.restore_sysctl() 1592 if target_obj.enable_adq: 1593 target_obj.reload_driver("ice") 1594 for i in initiators: 1595 i.restore_governor() 1596 i.restore_tuned() 1597 i.restore_services() 1598 i.restore_sysctl() 1599 if i.enable_adq: 1600 i.reload_driver("ice") 1601 target_obj.parse_results(args.results, args.csv_filename) 1602 finally: 1603 for i in initiators: 1604 try: 1605 i.stop() 1606 except Exception as err: 1607 pass 1608 target_obj.stop() 1609