1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.nic_ips = server_config["nic_ips"] 39 self.mode = server_config["mode"] 40 41 self.log = logging.getLogger(self.name) 42 43 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 44 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 45 self.irq_scripts_dir = server_config["irq_scripts_dir"] 46 47 self.local_nic_info = [] 48 self._nics_json_obj = {} 49 self.svc_restore_dict = {} 50 self.sysctl_restore_dict = {} 51 self.tuned_restore_dict = {} 52 self.governor_restore = "" 53 self.tuned_profile = "" 54 55 self.enable_adq = False 56 self.adq_priority = None 57 if "adq_enable" in server_config and server_config["adq_enable"]: 58 self.enable_adq = server_config["adq_enable"] 59 self.adq_priority = 1 60 61 if "tuned_profile" in server_config: 62 self.tuned_profile = server_config["tuned_profile"] 63 64 if not re.match("^[A-Za-z0-9]*$", name): 65 self.log.info("Please use a name which contains only letters or numbers") 66 sys.exit(1) 67 68 @staticmethod 69 def get_uncommented_lines(lines): 70 return [line for line in lines if line and not line.startswith('#')] 71 72 def get_nic_name_by_ip(self, ip): 73 if not self._nics_json_obj: 74 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 75 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 76 for nic in self._nics_json_obj: 77 for addr in nic["addr_info"]: 78 if ip in addr["local"]: 79 return nic["ifname"] 80 81 def set_local_nic_info_helper(self): 82 pass 83 84 def set_local_nic_info(self, pci_info): 85 def extract_network_elements(json_obj): 86 nic_list = [] 87 if isinstance(json_obj, list): 88 for x in json_obj: 89 nic_list.extend(extract_network_elements(x)) 90 elif isinstance(json_obj, dict): 91 if "children" in json_obj: 92 nic_list.extend(extract_network_elements(json_obj["children"])) 93 if "class" in json_obj.keys() and "network" in json_obj["class"]: 94 nic_list.append(json_obj) 95 return nic_list 96 97 self.local_nic_info = extract_network_elements(pci_info) 98 99 def get_nic_numa_node(self, nic_name): 100 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 101 102 def get_numa_cpu_map(self): 103 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 104 numa_cpu_json_map = {} 105 106 for cpu in numa_cpu_json_obj["cpus"]: 107 cpu_num = int(cpu["cpu"]) 108 numa_node = int(cpu["node"]) 109 numa_cpu_json_map.setdefault(numa_node, []) 110 numa_cpu_json_map[numa_node].append(cpu_num) 111 112 return numa_cpu_json_map 113 114 # pylint: disable=R0201 115 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 116 return "" 117 118 def configure_system(self): 119 self.load_drivers() 120 self.configure_services() 121 self.configure_sysctl() 122 self.configure_tuned() 123 self.configure_cpu_governor() 124 self.configure_irq_affinity() 125 126 def load_drivers(self): 127 self.log.info("Loading drivers") 128 self.exec_cmd(["sudo", "modprobe", "-a", 129 "nvme-%s" % self.transport, 130 "nvmet-%s" % self.transport]) 131 if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block: 132 self.exec_cmd(["sudo", "modprobe", "null_blk", 133 "nr_devices=%s" % self.null_block]) 134 135 def configure_adq(self): 136 if self.mode == "kernel": 137 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 138 return 139 self.adq_load_modules() 140 self.adq_configure_nic() 141 142 def adq_load_modules(self): 143 self.log.info("Modprobing ADQ-related Linux modules...") 144 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 145 for module in adq_module_deps: 146 try: 147 self.exec_cmd(["sudo", "modprobe", module]) 148 self.log.info("%s loaded!" % module) 149 except CalledProcessError as e: 150 self.log.error("ERROR: failed to load module %s" % module) 151 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 152 153 def adq_configure_tc(self): 154 self.log.info("Configuring ADQ Traffic classes and filters...") 155 156 if self.mode == "kernel": 157 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 158 return 159 160 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 161 num_queues_tc1 = self.num_cores 162 port_param = "dst_port" if isinstance(self, Target) else "src_port" 163 port = "4420" 164 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 165 166 for nic_ip in self.nic_ips: 167 nic_name = self.get_nic_name_by_ip(nic_ip) 168 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 169 "root", "mqprio", "num_tc", "2", "map", "0", "1", 170 "queues", "%s@0" % num_queues_tc0, 171 "%s@%s" % (num_queues_tc1, num_queues_tc0), 172 "hw", "1", "mode", "channel"] 173 self.log.info(" ".join(tc_qdisc_map_cmd)) 174 self.exec_cmd(tc_qdisc_map_cmd) 175 176 time.sleep(5) 177 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 178 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 179 self.exec_cmd(tc_qdisc_ingress_cmd) 180 181 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 182 "protocol", "ip", "ingress", "prio", "1", "flower", 183 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 184 "skip_sw", "hw_tc", "1"] 185 self.log.info(" ".join(tc_filter_cmd)) 186 self.exec_cmd(tc_filter_cmd) 187 188 # show tc configuration 189 self.log.info("Show tc configuration for %s NIC..." % nic_name) 190 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 191 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 192 self.log.info("%s" % tc_disk_out) 193 self.log.info("%s" % tc_filter_out) 194 195 # Ethtool coalesce settings must be applied after configuring traffic classes 196 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 197 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 198 199 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 200 xps_cmd = ["sudo", xps_script_path, nic_name] 201 self.log.info(xps_cmd) 202 self.exec_cmd(xps_cmd) 203 204 def reload_driver(self, driver): 205 try: 206 self.exec_cmd(["sudo", "rmmod", driver]) 207 self.exec_cmd(["sudo", "modprobe", driver]) 208 except CalledProcessError as e: 209 self.log.error("ERROR: failed to reload %s module!" % driver) 210 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 211 212 def adq_configure_nic(self): 213 self.log.info("Configuring NIC port settings for ADQ testing...") 214 215 # Reload the driver first, to make sure any previous settings are re-set. 216 self.reload_driver("ice") 217 218 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 219 for nic in nic_names: 220 self.log.info(nic) 221 try: 222 self.exec_cmd(["sudo", "ethtool", "-K", nic, 223 "hw-tc-offload", "on"]) # Enable hardware TC offload 224 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 225 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 226 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 227 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 228 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 229 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 230 "channel-pkt-inspect-optimize", "on"]) 231 except CalledProcessError as e: 232 self.log.error("ERROR: failed to configure NIC port using ethtool!") 233 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 234 self.log.info("Please update your NIC driver and firmware versions and try again.") 235 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 236 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 237 238 def configure_services(self): 239 self.log.info("Configuring active services...") 240 svc_config = configparser.ConfigParser(strict=False) 241 242 # Below list is valid only for RHEL / Fedora systems and might not 243 # contain valid names for other distributions. 244 svc_target_state = { 245 "firewalld": "inactive", 246 "irqbalance": "inactive", 247 "lldpad.service": "inactive", 248 "lldpad.socket": "inactive" 249 } 250 251 for service in svc_target_state: 252 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 253 out = "\n".join(["[%s]" % service, out]) 254 svc_config.read_string(out) 255 256 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 257 continue 258 259 service_state = svc_config[service]["ActiveState"] 260 self.log.info("Current state of %s service is %s" % (service, service_state)) 261 self.svc_restore_dict.update({service: service_state}) 262 if service_state != "inactive": 263 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 264 self.exec_cmd(["sudo", "systemctl", "stop", service]) 265 266 def configure_sysctl(self): 267 self.log.info("Tuning sysctl settings...") 268 269 busy_read = 0 270 if self.enable_adq and self.mode == "spdk": 271 busy_read = 1 272 273 sysctl_opts = { 274 "net.core.busy_poll": 0, 275 "net.core.busy_read": busy_read, 276 "net.core.somaxconn": 4096, 277 "net.core.netdev_max_backlog": 8192, 278 "net.ipv4.tcp_max_syn_backlog": 16384, 279 "net.core.rmem_max": 268435456, 280 "net.core.wmem_max": 268435456, 281 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 282 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 283 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 284 "net.ipv4.route.flush": 1, 285 "vm.overcommit_memory": 1, 286 } 287 288 for opt, value in sysctl_opts.items(): 289 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 290 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 291 292 def configure_tuned(self): 293 if not self.tuned_profile: 294 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 295 return 296 297 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 298 service = "tuned" 299 tuned_config = configparser.ConfigParser(strict=False) 300 301 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 302 out = "\n".join(["[%s]" % service, out]) 303 tuned_config.read_string(out) 304 tuned_state = tuned_config[service]["ActiveState"] 305 self.svc_restore_dict.update({service: tuned_state}) 306 307 if tuned_state != "inactive": 308 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 309 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 310 311 self.tuned_restore_dict = { 312 "profile": profile, 313 "mode": profile_mode 314 } 315 316 self.exec_cmd(["sudo", "systemctl", "start", service]) 317 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 318 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 319 320 def configure_cpu_governor(self): 321 self.log.info("Setting CPU governor to performance...") 322 323 # This assumes that there is the same CPU scaling governor on each CPU 324 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 325 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 326 327 def configure_irq_affinity(self): 328 self.log.info("Setting NIC irq affinity for NICs...") 329 330 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 331 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 332 for nic in nic_names: 333 irq_cmd = ["sudo", irq_script_path, nic] 334 self.log.info(irq_cmd) 335 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 336 337 def restore_services(self): 338 self.log.info("Restoring services...") 339 for service, state in self.svc_restore_dict.items(): 340 cmd = "stop" if state == "inactive" else "start" 341 self.exec_cmd(["sudo", "systemctl", cmd, service]) 342 343 def restore_sysctl(self): 344 self.log.info("Restoring sysctl settings...") 345 for opt, value in self.sysctl_restore_dict.items(): 346 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 347 348 def restore_tuned(self): 349 self.log.info("Restoring tuned-adm settings...") 350 351 if not self.tuned_restore_dict: 352 return 353 354 if self.tuned_restore_dict["mode"] == "auto": 355 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 356 self.log.info("Reverted tuned-adm to auto_profile.") 357 else: 358 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 359 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 360 361 def restore_governor(self): 362 self.log.info("Restoring CPU governor setting...") 363 if self.governor_restore: 364 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 365 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 366 367 def restore_settings(self): 368 self.restore_governor() 369 self.restore_tuned() 370 self.restore_services() 371 self.restore_sysctl() 372 if self.enable_adq: 373 self.reload_driver("ice") 374 375 376class Target(Server): 377 def __init__(self, name, general_config, target_config): 378 super().__init__(name, general_config, target_config) 379 380 # Defaults 381 self.enable_zcopy = False 382 self.scheduler_name = "static" 383 self.null_block = 0 384 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 385 self.subsystem_info_list = [] 386 self.initiator_info = [] 387 self.nvme_allowlist = [] 388 self.nvme_blocklist = [] 389 390 # Target-side measurement options 391 self.enable_pm = True 392 self.enable_sar = True 393 self.enable_pcm = True 394 self.enable_bw = True 395 self.enable_dpdk_memory = True 396 397 if "null_block_devices" in target_config: 398 self.null_block = target_config["null_block_devices"] 399 if "scheduler_settings" in target_config: 400 self.scheduler_name = target_config["scheduler_settings"] 401 if "zcopy_settings" in target_config: 402 self.enable_zcopy = target_config["zcopy_settings"] 403 if "results_dir" in target_config: 404 self.results_dir = target_config["results_dir"] 405 if "blocklist" in target_config: 406 self.nvme_blocklist = target_config["blocklist"] 407 if "allowlist" in target_config: 408 self.nvme_allowlist = target_config["allowlist"] 409 # Blocklist takes precedence, remove common elements from allowlist 410 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 411 if "enable_pm" in target_config: 412 self.enable_pm = target_config["enable_pm"] 413 if "enable_sar" in target_config: 414 self.enable_sar = target_config["sar_settings"] 415 if "enable_pcm" in target_config: 416 self.enable_pcm = target_config["enable_pcm"] 417 if "enable_bandwidth" in target_config: 418 self.enable_bw = target_config["enable_bandwidth"] 419 if "enable_dpdk_memory" in target_config: 420 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 421 422 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 423 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 424 425 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 426 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 427 self.set_local_nic_info(self.set_local_nic_info_helper()) 428 429 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 430 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 431 432 self.configure_system() 433 if self.enable_adq: 434 self.configure_adq() 435 self.sys_config() 436 437 def set_local_nic_info_helper(self): 438 return json.loads(self.exec_cmd(["lshw", "-json"])) 439 440 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 441 stderr_opt = None 442 if stderr_redirect: 443 stderr_opt = subprocess.STDOUT 444 if change_dir: 445 old_cwd = os.getcwd() 446 os.chdir(change_dir) 447 self.log.info("Changing directory to %s" % change_dir) 448 449 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 450 451 if change_dir: 452 os.chdir(old_cwd) 453 self.log.info("Changing directory to %s" % old_cwd) 454 return out 455 456 def zip_spdk_sources(self, spdk_dir, dest_file): 457 self.log.info("Zipping SPDK source directory") 458 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 459 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 460 for file in files: 461 fh.write(os.path.relpath(os.path.join(root, file))) 462 fh.close() 463 self.log.info("Done zipping") 464 465 @staticmethod 466 def _chunks(input_list, chunks_no): 467 div, rem = divmod(len(input_list), chunks_no) 468 for i in range(chunks_no): 469 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 470 yield input_list[si:si + (div + 1 if i < rem else div)] 471 472 def spread_bdevs(self, req_disks): 473 # Spread available block devices indexes: 474 # - evenly across available initiator systems 475 # - evenly across available NIC interfaces for 476 # each initiator 477 # Not NUMA aware. 478 ip_bdev_map = [] 479 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 480 481 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 482 self.initiator_info[i]["bdev_range"] = init_chunk 483 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 484 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 485 for c in nic_chunk: 486 ip_bdev_map.append((ip, c)) 487 return ip_bdev_map 488 489 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 490 cpu_number = os.cpu_count() 491 sar_idle_sum = 0 492 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 493 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 494 495 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 496 time.sleep(ramp_time) 497 self.log.info("Starting SAR measurements") 498 499 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 500 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 501 for line in out.split("\n"): 502 if "Average" in line: 503 if "CPU" in line: 504 self.log.info("Summary CPU utilization from SAR:") 505 self.log.info(line) 506 elif "all" in line: 507 self.log.info(line) 508 else: 509 sar_idle_sum += float(line.split()[7]) 510 fh.write(out) 511 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 512 513 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 514 f.write("%0.2f" % sar_cpu_usage) 515 516 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 517 time.sleep(ramp_time) 518 self.log.info("Starting power measurements") 519 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 520 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 521 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 522 523 def ethtool_after_fio_ramp(self, fio_ramp_time): 524 time.sleep(fio_ramp_time//2) 525 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 526 for nic in nic_names: 527 self.log.info(nic) 528 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 529 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 530 531 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 532 time.sleep(ramp_time) 533 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 534 pcm_memory = subprocess.Popen(cmd) 535 time.sleep(run_time) 536 pcm_memory.terminate() 537 538 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 539 time.sleep(ramp_time) 540 cmd = ["pcm", "1", "-i=%s" % run_time, 541 "-csv=%s/%s" % (results_dir, pcm_file_name)] 542 subprocess.run(cmd) 543 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 544 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 545 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 546 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 547 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 548 549 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 550 time.sleep(ramp_time) 551 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 552 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 553 fh.write(out) 554 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 555 # Improve this so that additional file containing measurements average is generated too. 556 557 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 558 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 559 time.sleep(ramp_time) 560 self.log.info("INFO: starting network bandwidth measure") 561 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 562 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 563 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 564 # Improve this so that additional file containing measurements average is generated too. 565 # TODO: Monitor only these interfaces which are currently used to run the workload. 566 567 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 568 self.log.info("INFO: waiting to generate DPDK memory usage") 569 time.sleep(ramp_time) 570 self.log.info("INFO: generating DPDK memory usage") 571 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 572 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 573 574 def sys_config(self): 575 self.log.info("====Kernel release:====") 576 self.log.info(os.uname().release) 577 self.log.info("====Kernel command line:====") 578 with open('/proc/cmdline') as f: 579 cmdline = f.readlines() 580 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 581 self.log.info("====sysctl conf:====") 582 with open('/etc/sysctl.conf') as f: 583 sysctl = f.readlines() 584 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 585 self.log.info("====Cpu power info:====") 586 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 587 self.log.info("====zcopy settings:====") 588 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 589 self.log.info("====Scheduler settings:====") 590 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 591 592 593class Initiator(Server): 594 def __init__(self, name, general_config, initiator_config): 595 super().__init__(name, general_config, initiator_config) 596 597 # Required fields 598 self.ip = initiator_config["ip"] 599 self.target_nic_ips = initiator_config["target_nic_ips"] 600 601 # Defaults 602 self.cpus_allowed = None 603 self.cpus_allowed_policy = "shared" 604 self.spdk_dir = "/tmp/spdk" 605 self.fio_bin = "/usr/src/fio/fio" 606 self.nvmecli_bin = "nvme" 607 self.cpu_frequency = None 608 self.subsystem_info_list = [] 609 610 if "spdk_dir" in initiator_config: 611 self.spdk_dir = initiator_config["spdk_dir"] 612 if "fio_bin" in initiator_config: 613 self.fio_bin = initiator_config["fio_bin"] 614 if "nvmecli_bin" in initiator_config: 615 self.nvmecli_bin = initiator_config["nvmecli_bin"] 616 if "cpus_allowed" in initiator_config: 617 self.cpus_allowed = initiator_config["cpus_allowed"] 618 if "cpus_allowed_policy" in initiator_config: 619 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 620 if "cpu_frequency" in initiator_config: 621 self.cpu_frequency = initiator_config["cpu_frequency"] 622 if os.getenv('SPDK_WORKSPACE'): 623 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 624 625 self.ssh_connection = paramiko.SSHClient() 626 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 627 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 628 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 629 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 630 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 631 632 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 633 self.copy_spdk("/tmp/spdk.zip") 634 self.set_local_nic_info(self.set_local_nic_info_helper()) 635 self.set_cpu_frequency() 636 self.configure_system() 637 if self.enable_adq: 638 self.configure_adq() 639 self.sys_config() 640 641 def set_local_nic_info_helper(self): 642 return json.loads(self.exec_cmd(["lshw", "-json"])) 643 644 def stop(self): 645 self.restore_settings() 646 self.ssh_connection.close() 647 648 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 649 if change_dir: 650 cmd = ["cd", change_dir, ";", *cmd] 651 652 # In case one of the command elements contains whitespace and is not 653 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 654 # when sending to remote system. 655 for i, c in enumerate(cmd): 656 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 657 cmd[i] = '"%s"' % c 658 cmd = " ".join(cmd) 659 660 # Redirect stderr to stdout thanks using get_pty option if needed 661 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 662 out = stdout.read().decode(encoding="utf-8") 663 664 # Check the return code 665 rc = stdout.channel.recv_exit_status() 666 if rc: 667 raise CalledProcessError(int(rc), cmd, out) 668 669 return out 670 671 def put_file(self, local, remote_dest): 672 ftp = self.ssh_connection.open_sftp() 673 ftp.put(local, remote_dest) 674 ftp.close() 675 676 def get_file(self, remote, local_dest): 677 ftp = self.ssh_connection.open_sftp() 678 ftp.get(remote, local_dest) 679 ftp.close() 680 681 def copy_spdk(self, local_spdk_zip): 682 self.log.info("Copying SPDK sources to initiator %s" % self.name) 683 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 684 self.log.info("Copied sources zip from target") 685 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 686 self.log.info("Sources unpacked") 687 688 def copy_result_files(self, dest_dir): 689 self.log.info("Copying results") 690 691 if not os.path.exists(dest_dir): 692 os.mkdir(dest_dir) 693 694 # Get list of result files from initiator and copy them back to target 695 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 696 697 for file in file_list: 698 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 699 os.path.join(dest_dir, file)) 700 self.log.info("Done copying results") 701 702 def match_subsystems(self, target_subsytems): 703 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 704 subsystems.sort(key=lambda x: x[1]) 705 self.log.info("Found matching subsystems on target side:") 706 for s in subsystems: 707 self.log.info(s) 708 self.subsystem_info_list = subsystems 709 710 def gen_fio_filename_conf(self, *args, **kwargs): 711 # Logic implemented in SPDKInitiator and KernelInitiator classes 712 pass 713 714 def get_route_nic_numa(self, remote_nvme_ip): 715 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 716 local_nvme_nic = local_nvme_nic[0]["dev"] 717 return self.get_nic_numa_node(local_nvme_nic) 718 719 @staticmethod 720 def gen_fio_offset_section(offset_inc, num_jobs): 721 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 722 return "\n".join(["size=%s%%" % offset_inc, 723 "offset=0%", 724 "offset_increment=%s%%" % offset_inc]) 725 726 def gen_fio_numa_section(self, fio_filenames_list): 727 numa_stats = {} 728 for nvme in fio_filenames_list: 729 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 730 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 731 732 # Use the most common NUMA node for this chunk to allocate memory and CPUs 733 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 734 735 return "\n".join(["numa_cpu_nodes=%s" % section_local_numa, 736 "numa_mem_policy=prefer:%s" % section_local_numa]) 737 738 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 739 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 740 offset=False, offset_inc=0): 741 fio_conf_template = """ 742[global] 743ioengine={ioengine} 744{spdk_conf} 745thread=1 746group_reporting=1 747direct=1 748percentile_list=50:90:99:99.5:99.9:99.99:99.999 749 750norandommap=1 751rw={rw} 752rwmixread={rwmixread} 753bs={block_size} 754time_based=1 755ramp_time={ramp_time} 756runtime={run_time} 757rate_iops={rate_iops} 758""" 759 if "spdk" in self.mode: 760 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 761 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 762 else: 763 ioengine = self.ioengine 764 spdk_conf = "" 765 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 766 "|", "awk", "'{print $1}'"]) 767 subsystems = [x for x in out.split("\n") if "nvme" in x] 768 769 if self.cpus_allowed is not None: 770 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 771 cpus_num = 0 772 cpus = self.cpus_allowed.split(",") 773 for cpu in cpus: 774 if "-" in cpu: 775 a, b = cpu.split("-") 776 a = int(a) 777 b = int(b) 778 cpus_num += len(range(a, b)) 779 else: 780 cpus_num += 1 781 self.num_cores = cpus_num 782 threads = range(0, self.num_cores) 783 elif hasattr(self, 'num_cores'): 784 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 785 threads = range(0, int(self.num_cores)) 786 else: 787 self.num_cores = len(subsystems) 788 threads = range(0, len(subsystems)) 789 790 if "spdk" in self.mode: 791 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 792 offset, offset_inc) 793 else: 794 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs, 795 offset, offset_inc) 796 797 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 798 rw=rw, rwmixread=rwmixread, block_size=block_size, 799 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 800 801 # TODO: hipri disabled for now, as it causes fio errors: 802 # io_u error on file /dev/nvme2n1: Operation not supported 803 # See comment in KernelInitiator class, init_connect() function 804 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 805 fio_config = fio_config + """ 806fixedbufs=1 807registerfiles=1 808#hipri=1 809""" 810 if num_jobs: 811 fio_config = fio_config + "numjobs=%s \n" % num_jobs 812 if self.cpus_allowed is not None: 813 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 814 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 815 fio_config = fio_config + filename_section 816 817 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 818 if hasattr(self, "num_cores"): 819 fio_config_filename += "_%sCPU" % self.num_cores 820 fio_config_filename += ".fio" 821 822 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 823 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 824 self.log.info("Created FIO Config:") 825 self.log.info(fio_config) 826 827 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 828 829 def set_cpu_frequency(self): 830 if self.cpu_frequency is not None: 831 try: 832 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 833 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 834 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 835 except Exception: 836 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 837 sys.exit() 838 else: 839 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 840 841 def run_fio(self, fio_config_file, run_num=1): 842 job_name, _ = os.path.splitext(fio_config_file) 843 self.log.info("Starting FIO run for job: %s" % job_name) 844 self.log.info("Using FIO: %s" % self.fio_bin) 845 846 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 847 try: 848 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 849 "--output=%s" % output_filename, "--eta=never"], True) 850 self.log.info(output) 851 self.log.info("FIO run finished. Results in: %s" % output_filename) 852 except subprocess.CalledProcessError as e: 853 self.log.error("ERROR: Fio process failed!") 854 self.log.error(e.stdout) 855 856 def sys_config(self): 857 self.log.info("====Kernel release:====") 858 self.log.info(self.exec_cmd(["uname", "-r"])) 859 self.log.info("====Kernel command line:====") 860 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 861 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 862 self.log.info("====sysctl conf:====") 863 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 864 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 865 self.log.info("====Cpu power info:====") 866 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 867 868 869class KernelTarget(Target): 870 def __init__(self, name, general_config, target_config): 871 super().__init__(name, general_config, target_config) 872 # Defaults 873 self.nvmet_bin = "nvmetcli" 874 875 if "nvmet_bin" in target_config: 876 self.nvmet_bin = target_config["nvmet_bin"] 877 878 def stop(self): 879 self.nvmet_command(self.nvmet_bin, "clear") 880 self.restore_settings() 881 882 def get_nvme_device_bdf(self, nvme_dev_path): 883 nvme_name = os.path.basename(nvme_dev_path) 884 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 885 886 def get_nvme_devices(self): 887 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 888 nvme_list = [] 889 for dev in dev_list: 890 if "nvme" not in dev: 891 continue 892 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 893 continue 894 if len(self.nvme_allowlist) == 0: 895 nvme_list.append(dev) 896 continue 897 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 898 nvme_list.append(dev) 899 return dev_list 900 901 def nvmet_command(self, nvmet_bin, command): 902 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 903 904 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 905 906 nvmet_cfg = { 907 "ports": [], 908 "hosts": [], 909 "subsystems": [], 910 } 911 912 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 913 port = str(4420 + bdev_num) 914 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 915 serial = "SPDK00%s" % bdev_num 916 bdev_name = nvme_list[bdev_num] 917 918 nvmet_cfg["subsystems"].append({ 919 "allowed_hosts": [], 920 "attr": { 921 "allow_any_host": "1", 922 "serial": serial, 923 "version": "1.3" 924 }, 925 "namespaces": [ 926 { 927 "device": { 928 "path": bdev_name, 929 "uuid": "%s" % uuid.uuid4() 930 }, 931 "enable": 1, 932 "nsid": port 933 } 934 ], 935 "nqn": nqn 936 }) 937 938 nvmet_cfg["ports"].append({ 939 "addr": { 940 "adrfam": "ipv4", 941 "traddr": ip, 942 "trsvcid": port, 943 "trtype": self.transport 944 }, 945 "portid": bdev_num, 946 "referrals": [], 947 "subsystems": [nqn] 948 }) 949 950 self.subsystem_info_list.append((port, nqn, ip)) 951 self.subsys_no = len(self.subsystem_info_list) 952 953 with open("kernel.conf", "w") as fh: 954 fh.write(json.dumps(nvmet_cfg, indent=2)) 955 956 def tgt_start(self): 957 self.log.info("Configuring kernel NVMeOF Target") 958 959 if self.null_block: 960 self.log.info("Configuring with null block device.") 961 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 962 else: 963 self.log.info("Configuring with NVMe drives.") 964 nvme_list = self.get_nvme_devices() 965 966 self.kernel_tgt_gen_subsystem_conf(nvme_list) 967 self.subsys_no = len(nvme_list) 968 969 self.nvmet_command(self.nvmet_bin, "clear") 970 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 971 972 if self.enable_adq: 973 self.adq_configure_tc() 974 975 self.log.info("Done configuring kernel NVMeOF Target") 976 977 978class SPDKTarget(Target): 979 def __init__(self, name, general_config, target_config): 980 super().__init__(name, general_config, target_config) 981 982 # Required fields 983 self.core_mask = target_config["core_mask"] 984 self.num_cores = self.get_num_cores(self.core_mask) 985 986 # Defaults 987 self.dif_insert_strip = False 988 self.null_block_dif_type = 0 989 self.num_shared_buffers = 4096 990 self.max_queue_depth = 128 991 self.bpf_proc = None 992 self.bpf_scripts = [] 993 self.enable_dsa = False 994 self.scheduler_core_limit = None 995 996 if "num_shared_buffers" in target_config: 997 self.num_shared_buffers = target_config["num_shared_buffers"] 998 if "max_queue_depth" in target_config: 999 self.max_queue_depth = target_config["max_queue_depth"] 1000 if "null_block_dif_type" in target_config: 1001 self.null_block_dif_type = target_config["null_block_dif_type"] 1002 if "dif_insert_strip" in target_config: 1003 self.dif_insert_strip = target_config["dif_insert_strip"] 1004 if "bpf_scripts" in target_config: 1005 self.bpf_scripts = target_config["bpf_scripts"] 1006 if "dsa_settings" in target_config: 1007 self.enable_dsa = target_config["dsa_settings"] 1008 if "scheduler_core_limit" in target_config: 1009 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1010 1011 self.log.info("====DSA settings:====") 1012 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1013 1014 def get_nvme_devices_count(self): 1015 return len(self.get_nvme_devices()) 1016 1017 def get_nvme_devices(self): 1018 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1019 bdev_bdfs = [] 1020 for bdev in bdev_subsys_json_obj["config"]: 1021 bdev_traddr = bdev["params"]["traddr"] 1022 if bdev_traddr in self.nvme_blocklist: 1023 continue 1024 if len(self.nvme_allowlist) == 0: 1025 bdev_bdfs.append(bdev_traddr) 1026 if bdev_traddr in self.nvme_allowlist: 1027 bdev_bdfs.append(bdev_traddr) 1028 return bdev_bdfs 1029 1030 @staticmethod 1031 def get_num_cores(core_mask): 1032 if "0x" in core_mask: 1033 return bin(int(core_mask, 16)).count("1") 1034 else: 1035 num_cores = 0 1036 core_mask = core_mask.replace("[", "") 1037 core_mask = core_mask.replace("]", "") 1038 for i in core_mask.split(","): 1039 if "-" in i: 1040 x, y = i.split("-") 1041 num_cores += len(range(int(x), int(y))) + 1 1042 else: 1043 num_cores += 1 1044 return num_cores 1045 1046 def spdk_tgt_configure(self): 1047 self.log.info("Configuring SPDK NVMeOF target via RPC") 1048 1049 if self.enable_adq: 1050 self.adq_configure_tc() 1051 1052 # Create transport layer 1053 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1054 num_shared_buffers=self.num_shared_buffers, 1055 max_queue_depth=self.max_queue_depth, 1056 dif_insert_or_strip=self.dif_insert_strip, 1057 sock_priority=self.adq_priority) 1058 self.log.info("SPDK NVMeOF transport layer:") 1059 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1060 1061 if self.null_block: 1062 self.spdk_tgt_add_nullblock(self.null_block) 1063 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1064 else: 1065 self.spdk_tgt_add_nvme_conf() 1066 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1067 1068 self.log.info("Done configuring SPDK NVMeOF Target") 1069 1070 def spdk_tgt_add_nullblock(self, null_block_count): 1071 md_size = 0 1072 block_size = 4096 1073 if self.null_block_dif_type != 0: 1074 md_size = 128 1075 1076 self.log.info("Adding null block bdevices to config via RPC") 1077 for i in range(null_block_count): 1078 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1079 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1080 dif_type=self.null_block_dif_type, md_size=md_size) 1081 self.log.info("SPDK Bdevs configuration:") 1082 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1083 1084 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1085 self.log.info("Adding NVMe bdevs to config via RPC") 1086 1087 bdfs = self.get_nvme_devices() 1088 bdfs = [b.replace(":", ".") for b in bdfs] 1089 1090 if req_num_disks: 1091 if req_num_disks > len(bdfs): 1092 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1093 sys.exit(1) 1094 else: 1095 bdfs = bdfs[0:req_num_disks] 1096 1097 for i, bdf in enumerate(bdfs): 1098 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1099 1100 self.log.info("SPDK Bdevs configuration:") 1101 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1102 1103 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1104 self.log.info("Adding subsystems to config") 1105 if not req_num_disks: 1106 req_num_disks = self.get_nvme_devices_count() 1107 1108 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1109 port = str(4420 + bdev_num) 1110 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1111 serial = "SPDK00%s" % bdev_num 1112 bdev_name = "Nvme%sn1" % bdev_num 1113 1114 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1115 allow_any_host=True, max_namespaces=8) 1116 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1117 for nqn_name in [nqn, "discovery"]: 1118 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1119 nqn=nqn_name, 1120 trtype=self.transport, 1121 traddr=ip, 1122 trsvcid=port, 1123 adrfam="ipv4") 1124 self.subsystem_info_list.append((port, nqn, ip)) 1125 self.subsys_no = len(self.subsystem_info_list) 1126 1127 self.log.info("SPDK NVMeOF subsystem configuration:") 1128 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1129 1130 def bpf_start(self): 1131 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1132 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1133 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1134 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1135 1136 with open(self.pid, "r") as fh: 1137 nvmf_pid = str(fh.readline()) 1138 1139 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1140 self.log.info(cmd) 1141 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1142 1143 def tgt_start(self): 1144 if self.null_block: 1145 self.subsys_no = 1 1146 else: 1147 self.subsys_no = self.get_nvme_devices_count() 1148 self.log.info("Starting SPDK NVMeOF Target process") 1149 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1150 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1151 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1152 1153 with open(self.pid, "w") as fh: 1154 fh.write(str(proc.pid)) 1155 self.nvmf_proc = proc 1156 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1157 self.log.info("Waiting for spdk to initialize...") 1158 while True: 1159 if os.path.exists("/var/tmp/spdk.sock"): 1160 break 1161 time.sleep(1) 1162 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1163 1164 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1165 1166 if self.enable_zcopy: 1167 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1168 enable_zerocopy_send_server=True) 1169 self.log.info("Target socket options:") 1170 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1171 1172 if self.enable_adq: 1173 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1174 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1175 nvme_adminq_poll_period_us=100000, retry_count=4) 1176 1177 if self.enable_dsa: 1178 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1179 self.log.info("Target DSA accel module enabled") 1180 1181 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1182 rpc.framework_start_init(self.client) 1183 1184 if self.bpf_scripts: 1185 self.bpf_start() 1186 1187 self.spdk_tgt_configure() 1188 1189 def stop(self): 1190 if self.bpf_proc: 1191 self.log.info("Stopping BPF Trace script") 1192 self.bpf_proc.terminate() 1193 self.bpf_proc.wait() 1194 1195 if hasattr(self, "nvmf_proc"): 1196 try: 1197 self.nvmf_proc.terminate() 1198 self.nvmf_proc.wait(timeout=30) 1199 except Exception as e: 1200 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1201 self.log.info(e) 1202 self.nvmf_proc.kill() 1203 self.nvmf_proc.communicate() 1204 # Try to clean up RPC socket files if they were not removed 1205 # because of using 'kill' 1206 try: 1207 os.remove("/var/tmp/spdk.sock") 1208 os.remove("/var/tmp/spdk.sock.lock") 1209 except FileNotFoundError: 1210 pass 1211 self.restore_settings() 1212 1213 1214class KernelInitiator(Initiator): 1215 def __init__(self, name, general_config, initiator_config): 1216 super().__init__(name, general_config, initiator_config) 1217 1218 # Defaults 1219 self.extra_params = "" 1220 self.ioengine = "libaio" 1221 1222 if "extra_params" in initiator_config: 1223 self.extra_params = initiator_config["extra_params"] 1224 1225 if "kernel_engine" in initiator_config: 1226 self.ioengine = initiator_config["kernel_engine"] 1227 if "io_uring" in self.ioengine: 1228 self.extra_params = "--nr-poll-queues=8" 1229 1230 def get_connected_nvme_list(self): 1231 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1232 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1233 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1234 return nvme_list 1235 1236 def init_connect(self): 1237 self.log.info("Below connection attempts may result in error messages, this is expected!") 1238 for subsystem in self.subsystem_info_list: 1239 self.log.info("Trying to connect %s %s %s" % subsystem) 1240 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1241 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1242 time.sleep(2) 1243 1244 if "io_uring" in self.ioengine: 1245 self.log.info("Setting block layer settings for io_uring.") 1246 1247 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1248 # apparently it's not possible for connected subsystems. 1249 # Results in "error: Invalid argument" 1250 block_sysfs_settings = { 1251 "iostats": "0", 1252 "rq_affinity": "0", 1253 "nomerges": "2" 1254 } 1255 1256 for disk in self.get_connected_nvme_list(): 1257 sysfs = os.path.join("/sys/block", disk, "queue") 1258 for k, v in block_sysfs_settings.items(): 1259 sysfs_opt_path = os.path.join(sysfs, k) 1260 try: 1261 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1262 except CalledProcessError as e: 1263 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1264 finally: 1265 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1266 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1267 1268 def init_disconnect(self): 1269 for subsystem in self.subsystem_info_list: 1270 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1271 time.sleep(1) 1272 1273 def get_nvme_subsystem_numa(self, dev_name): 1274 # Remove two last characters to get controller name instead of subsystem name 1275 nvme_ctrl = os.path.basename(dev_name)[:-2] 1276 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1277 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1278 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1279 1280 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1281 # Generate connected nvme devices names and sort them by used NIC numa node 1282 # to allow better grouping when splitting into fio sections. 1283 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1284 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1285 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1286 1287 filename_section = "" 1288 nvme_per_split = int(len(nvme_list) / len(threads)) 1289 remainder = len(nvme_list) % len(threads) 1290 iterator = iter(nvme_list) 1291 result = [] 1292 for i in range(len(threads)): 1293 result.append([]) 1294 for _ in range(nvme_per_split): 1295 result[i].append(next(iterator)) 1296 if remainder: 1297 result[i].append(next(iterator)) 1298 remainder -= 1 1299 for i, r in enumerate(result): 1300 header = "[filename%s]" % i 1301 disks = "\n".join(["filename=%s" % x for x in r]) 1302 job_section_qd = round((io_depth * len(r)) / num_jobs) 1303 if job_section_qd == 0: 1304 job_section_qd = 1 1305 iodepth = "iodepth=%s" % job_section_qd 1306 1307 offset_section = "" 1308 if offset: 1309 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1310 1311 numa_opts = self.gen_fio_numa_section(r) 1312 1313 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1314 1315 return filename_section 1316 1317 1318class SPDKInitiator(Initiator): 1319 def __init__(self, name, general_config, initiator_config): 1320 super().__init__(name, general_config, initiator_config) 1321 1322 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1323 self.install_spdk() 1324 1325 # Required fields 1326 self.num_cores = initiator_config["num_cores"] 1327 1328 # Optional fields 1329 self.enable_data_digest = False 1330 if "enable_data_digest" in initiator_config: 1331 self.enable_data_digest = initiator_config["enable_data_digest"] 1332 1333 def install_spdk(self): 1334 self.log.info("Using fio binary %s" % self.fio_bin) 1335 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1336 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1337 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1338 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1339 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1340 1341 self.log.info("SPDK built") 1342 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1343 1344 def init_connect(self): 1345 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1346 # bdev plugin initiates connection just before starting IO traffic. 1347 # This is just to have a "init_connect" equivalent of the same function 1348 # from KernelInitiator class. 1349 # Just prepare bdev.conf JSON file for later use and consider it 1350 # "making a connection". 1351 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1352 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1353 1354 def init_disconnect(self): 1355 # SPDK Initiator does not need to explicity disconnect as this gets done 1356 # after fio bdev plugin finishes IO. 1357 pass 1358 1359 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1360 bdev_cfg_section = { 1361 "subsystems": [ 1362 { 1363 "subsystem": "bdev", 1364 "config": [] 1365 } 1366 ] 1367 } 1368 1369 for i, subsys in enumerate(remote_subsystem_list): 1370 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1371 nvme_ctrl = { 1372 "method": "bdev_nvme_attach_controller", 1373 "params": { 1374 "name": "Nvme{}".format(i), 1375 "trtype": self.transport, 1376 "traddr": sub_addr, 1377 "trsvcid": sub_port, 1378 "subnqn": sub_nqn, 1379 "adrfam": "IPv4" 1380 } 1381 } 1382 1383 if self.enable_adq: 1384 nvme_ctrl["params"].update({"priority": "1"}) 1385 1386 if self.enable_data_digest: 1387 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1388 1389 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1390 1391 return json.dumps(bdev_cfg_section, indent=2) 1392 1393 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1394 filename_section = "" 1395 if len(threads) >= len(subsystems): 1396 threads = range(0, len(subsystems)) 1397 1398 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1399 # to allow better grouping when splitting into fio sections. 1400 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1401 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1402 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1403 1404 nvme_per_split = int(len(subsystems) / len(threads)) 1405 remainder = len(subsystems) % len(threads) 1406 iterator = iter(filenames) 1407 result = [] 1408 for i in range(len(threads)): 1409 result.append([]) 1410 for _ in range(nvme_per_split): 1411 result[i].append(next(iterator)) 1412 if remainder: 1413 result[i].append(next(iterator)) 1414 remainder -= 1 1415 for i, r in enumerate(result): 1416 header = "[filename%s]" % i 1417 disks = "\n".join(["filename=%s" % x for x in r]) 1418 job_section_qd = round((io_depth * len(r)) / num_jobs) 1419 if job_section_qd == 0: 1420 job_section_qd = 1 1421 iodepth = "iodepth=%s" % job_section_qd 1422 1423 offset_section = "" 1424 if offset: 1425 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1426 1427 numa_opts = self.gen_fio_numa_section(r) 1428 1429 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1430 1431 return filename_section 1432 1433 def get_nvme_subsystem_numa(self, bdev_name): 1434 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1435 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1436 1437 # Remove two last characters to get controller name instead of subsystem name 1438 nvme_ctrl = bdev_name[:-2] 1439 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1440 return self.get_route_nic_numa(remote_nvme_ip) 1441 1442 1443if __name__ == "__main__": 1444 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1445 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1446 1447 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1448 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1449 help='Configuration file.') 1450 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1451 help='Results directory.') 1452 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1453 help='CSV results filename.') 1454 parser.add_argument('-f', '--force', default=False, action='store_true', 1455 dest='force', help="""Force script to continue and try to use all 1456 available NVMe devices during test. 1457 WARNING: Might result in data loss on used NVMe drives""") 1458 1459 args = parser.parse_args() 1460 1461 logging.basicConfig(level=logging.INFO, 1462 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1463 1464 logging.info("Using config file: %s" % args.config) 1465 with open(args.config, "r") as config: 1466 data = json.load(config) 1467 1468 initiators = [] 1469 fio_cases = [] 1470 1471 general_config = data["general"] 1472 target_config = data["target"] 1473 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1474 1475 if "null_block_devices" not in data["target"] and \ 1476 (args.force is False and 1477 "allowlist" not in data["target"] and 1478 "blocklist" not in data["target"]): 1479 # TODO: Also check if allowlist or blocklist are not empty. 1480 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1481 You can choose to use all available NVMe drives on your system, which may potentially 1482 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1483 exit(1) 1484 1485 for k, v in data.items(): 1486 if "target" in k: 1487 v.update({"results_dir": args.results}) 1488 if data[k]["mode"] == "spdk": 1489 target_obj = SPDKTarget(k, data["general"], v) 1490 elif data[k]["mode"] == "kernel": 1491 target_obj = KernelTarget(k, data["general"], v) 1492 elif "initiator" in k: 1493 if data[k]["mode"] == "spdk": 1494 init_obj = SPDKInitiator(k, data["general"], v) 1495 elif data[k]["mode"] == "kernel": 1496 init_obj = KernelInitiator(k, data["general"], v) 1497 initiators.append(init_obj) 1498 elif "fio" in k: 1499 fio_workloads = itertools.product(data[k]["bs"], 1500 data[k]["qd"], 1501 data[k]["rw"]) 1502 1503 fio_run_time = data[k]["run_time"] 1504 fio_ramp_time = data[k]["ramp_time"] 1505 fio_rw_mix_read = data[k]["rwmixread"] 1506 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1507 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1508 1509 fio_rate_iops = 0 1510 if "rate_iops" in data[k]: 1511 fio_rate_iops = data[k]["rate_iops"] 1512 1513 fio_offset = False 1514 if "offset" in data[k]: 1515 fio_offset = data[k]["offset"] 1516 fio_offset_inc = 0 1517 if "offset_inc" in data[k]: 1518 fio_offset_inc = data[k]["offset_inc"] 1519 else: 1520 continue 1521 1522 try: 1523 os.mkdir(args.results) 1524 except FileExistsError: 1525 pass 1526 1527 for i in initiators: 1528 target_obj.initiator_info.append( 1529 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1530 ) 1531 1532 # TODO: This try block is definietly too large. Need to break this up into separate 1533 # logical blocks to reduce size. 1534 try: 1535 target_obj.tgt_start() 1536 1537 for i in initiators: 1538 i.match_subsystems(target_obj.subsystem_info_list) 1539 if i.enable_adq: 1540 i.adq_configure_tc() 1541 1542 # Poor mans threading 1543 # Run FIO tests 1544 for block_size, io_depth, rw in fio_workloads: 1545 configs = [] 1546 for i in initiators: 1547 i.init_connect() 1548 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1549 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1550 fio_offset, fio_offset_inc) 1551 configs.append(cfg) 1552 1553 for run_no in range(1, fio_run_num+1): 1554 threads = [] 1555 power_daemon = None 1556 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1557 1558 for i, cfg in zip(initiators, configs): 1559 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1560 threads.append(t) 1561 if target_obj.enable_sar: 1562 sar_file_prefix = measurements_prefix + "_sar" 1563 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1564 threads.append(t) 1565 1566 if target_obj.enable_pcm: 1567 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1568 1569 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1570 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1571 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1572 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1573 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1574 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1575 1576 threads.append(pcm_cpu_t) 1577 threads.append(pcm_mem_t) 1578 threads.append(pcm_pow_t) 1579 1580 if target_obj.enable_bw: 1581 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1582 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1583 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1584 threads.append(t) 1585 1586 if target_obj.enable_dpdk_memory: 1587 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1588 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1589 threads.append(t) 1590 1591 if target_obj.enable_pm: 1592 power_daemon = threading.Thread(target=target_obj.measure_power, 1593 args=(args.results, measurements_prefix, script_full_dir, 1594 fio_ramp_time, fio_run_time)) 1595 threads.append(power_daemon) 1596 1597 if target_obj.enable_adq: 1598 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1599 threads.append(ethtool_thread) 1600 1601 for t in threads: 1602 t.start() 1603 for t in threads: 1604 t.join() 1605 1606 for i in initiators: 1607 i.init_disconnect() 1608 i.copy_result_files(args.results) 1609 try: 1610 parse_results(args.results, args.csv_filename) 1611 except Exception: 1612 logging.error("There was an error with parsing the results") 1613 finally: 1614 for i in initiators: 1615 try: 1616 i.stop() 1617 except Exception as err: 1618 pass 1619 target_obj.stop() 1620