1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.nic_ips = server_config["nic_ips"] 39 self.mode = server_config["mode"] 40 41 self.log = logging.getLogger(self.name) 42 43 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 44 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 45 self.irq_scripts_dir = server_config["irq_scripts_dir"] 46 47 self.local_nic_info = [] 48 self._nics_json_obj = {} 49 self.svc_restore_dict = {} 50 self.sysctl_restore_dict = {} 51 self.tuned_restore_dict = {} 52 self.governor_restore = "" 53 self.tuned_profile = "" 54 55 self.enable_adq = False 56 self.adq_priority = None 57 if "adq_enable" in server_config and server_config["adq_enable"]: 58 self.enable_adq = server_config["adq_enable"] 59 self.adq_priority = 1 60 61 if "tuned_profile" in server_config: 62 self.tuned_profile = server_config["tuned_profile"] 63 64 if not re.match("^[A-Za-z0-9]*$", name): 65 self.log.info("Please use a name which contains only letters or numbers") 66 sys.exit(1) 67 68 @staticmethod 69 def get_uncommented_lines(lines): 70 return [line for line in lines if line and not line.startswith('#')] 71 72 def get_nic_name_by_ip(self, ip): 73 if not self._nics_json_obj: 74 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 75 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 76 for nic in self._nics_json_obj: 77 for addr in nic["addr_info"]: 78 if ip in addr["local"]: 79 return nic["ifname"] 80 81 def set_local_nic_info_helper(self): 82 pass 83 84 def set_local_nic_info(self, pci_info): 85 def extract_network_elements(json_obj): 86 nic_list = [] 87 if isinstance(json_obj, list): 88 for x in json_obj: 89 nic_list.extend(extract_network_elements(x)) 90 elif isinstance(json_obj, dict): 91 if "children" in json_obj: 92 nic_list.extend(extract_network_elements(json_obj["children"])) 93 if "class" in json_obj.keys() and "network" in json_obj["class"]: 94 nic_list.append(json_obj) 95 return nic_list 96 97 self.local_nic_info = extract_network_elements(pci_info) 98 99 def get_nic_numa_node(self, nic_name): 100 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 101 102 def get_numa_cpu_map(self): 103 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 104 numa_cpu_json_map = {} 105 106 for cpu in numa_cpu_json_obj["cpus"]: 107 cpu_num = int(cpu["cpu"]) 108 numa_node = int(cpu["node"]) 109 numa_cpu_json_map.setdefault(numa_node, []) 110 numa_cpu_json_map[numa_node].append(cpu_num) 111 112 return numa_cpu_json_map 113 114 # pylint: disable=R0201 115 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 116 return "" 117 118 def configure_system(self): 119 self.load_drivers() 120 self.configure_services() 121 self.configure_sysctl() 122 self.configure_tuned() 123 self.configure_cpu_governor() 124 self.configure_irq_affinity() 125 126 def load_drivers(self): 127 self.log.info("Loading drivers") 128 self.exec_cmd(["sudo", "modprobe", "-a", 129 "nvme-%s" % self.transport, 130 "nvmet-%s" % self.transport]) 131 if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block: 132 self.exec_cmd(["sudo", "modprobe", "null_blk", 133 "nr_devices=%s" % self.null_block]) 134 135 def configure_adq(self): 136 if self.mode == "kernel": 137 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 138 return 139 self.adq_load_modules() 140 self.adq_configure_nic() 141 142 def adq_load_modules(self): 143 self.log.info("Modprobing ADQ-related Linux modules...") 144 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 145 for module in adq_module_deps: 146 try: 147 self.exec_cmd(["sudo", "modprobe", module]) 148 self.log.info("%s loaded!" % module) 149 except CalledProcessError as e: 150 self.log.error("ERROR: failed to load module %s" % module) 151 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 152 153 def adq_configure_tc(self): 154 self.log.info("Configuring ADQ Traffic classes and filters...") 155 156 if self.mode == "kernel": 157 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 158 return 159 160 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 161 num_queues_tc1 = self.num_cores 162 port_param = "dst_port" if isinstance(self, Target) else "src_port" 163 port = "4420" 164 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 165 166 for nic_ip in self.nic_ips: 167 nic_name = self.get_nic_name_by_ip(nic_ip) 168 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 169 "root", "mqprio", "num_tc", "2", "map", "0", "1", 170 "queues", "%s@0" % num_queues_tc0, 171 "%s@%s" % (num_queues_tc1, num_queues_tc0), 172 "hw", "1", "mode", "channel"] 173 self.log.info(" ".join(tc_qdisc_map_cmd)) 174 self.exec_cmd(tc_qdisc_map_cmd) 175 176 time.sleep(5) 177 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 178 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 179 self.exec_cmd(tc_qdisc_ingress_cmd) 180 181 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 182 "protocol", "ip", "ingress", "prio", "1", "flower", 183 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 184 "skip_sw", "hw_tc", "1"] 185 self.log.info(" ".join(tc_filter_cmd)) 186 self.exec_cmd(tc_filter_cmd) 187 188 # show tc configuration 189 self.log.info("Show tc configuration for %s NIC..." % nic_name) 190 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 191 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 192 self.log.info("%s" % tc_disk_out) 193 self.log.info("%s" % tc_filter_out) 194 195 # Ethtool coalesce settings must be applied after configuring traffic classes 196 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 197 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 198 199 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 200 xps_cmd = ["sudo", xps_script_path, nic_name] 201 self.log.info(xps_cmd) 202 self.exec_cmd(xps_cmd) 203 204 def reload_driver(self, driver): 205 try: 206 self.exec_cmd(["sudo", "rmmod", driver]) 207 self.exec_cmd(["sudo", "modprobe", driver]) 208 except CalledProcessError as e: 209 self.log.error("ERROR: failed to reload %s module!" % driver) 210 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 211 212 def adq_configure_nic(self): 213 self.log.info("Configuring NIC port settings for ADQ testing...") 214 215 # Reload the driver first, to make sure any previous settings are re-set. 216 self.reload_driver("ice") 217 218 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 219 for nic in nic_names: 220 self.log.info(nic) 221 try: 222 self.exec_cmd(["sudo", "ethtool", "-K", nic, 223 "hw-tc-offload", "on"]) # Enable hardware TC offload 224 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 225 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 226 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 227 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 228 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 229 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 230 "channel-pkt-inspect-optimize", "on"]) 231 except CalledProcessError as e: 232 self.log.error("ERROR: failed to configure NIC port using ethtool!") 233 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 234 self.log.info("Please update your NIC driver and firmware versions and try again.") 235 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 236 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 237 238 def configure_services(self): 239 self.log.info("Configuring active services...") 240 svc_config = configparser.ConfigParser(strict=False) 241 242 # Below list is valid only for RHEL / Fedora systems and might not 243 # contain valid names for other distributions. 244 svc_target_state = { 245 "firewalld": "inactive", 246 "irqbalance": "inactive", 247 "lldpad.service": "inactive", 248 "lldpad.socket": "inactive" 249 } 250 251 for service in svc_target_state: 252 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 253 out = "\n".join(["[%s]" % service, out]) 254 svc_config.read_string(out) 255 256 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 257 continue 258 259 service_state = svc_config[service]["ActiveState"] 260 self.log.info("Current state of %s service is %s" % (service, service_state)) 261 self.svc_restore_dict.update({service: service_state}) 262 if service_state != "inactive": 263 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 264 self.exec_cmd(["sudo", "systemctl", "stop", service]) 265 266 def configure_sysctl(self): 267 self.log.info("Tuning sysctl settings...") 268 269 busy_read = 0 270 if self.enable_adq and self.mode == "spdk": 271 busy_read = 1 272 273 sysctl_opts = { 274 "net.core.busy_poll": 0, 275 "net.core.busy_read": busy_read, 276 "net.core.somaxconn": 4096, 277 "net.core.netdev_max_backlog": 8192, 278 "net.ipv4.tcp_max_syn_backlog": 16384, 279 "net.core.rmem_max": 268435456, 280 "net.core.wmem_max": 268435456, 281 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 282 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 283 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 284 "net.ipv4.route.flush": 1, 285 "vm.overcommit_memory": 1, 286 } 287 288 for opt, value in sysctl_opts.items(): 289 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 290 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 291 292 def configure_tuned(self): 293 if not self.tuned_profile: 294 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 295 return 296 297 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 298 service = "tuned" 299 tuned_config = configparser.ConfigParser(strict=False) 300 301 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 302 out = "\n".join(["[%s]" % service, out]) 303 tuned_config.read_string(out) 304 tuned_state = tuned_config[service]["ActiveState"] 305 self.svc_restore_dict.update({service: tuned_state}) 306 307 if tuned_state != "inactive": 308 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 309 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 310 311 self.tuned_restore_dict = { 312 "profile": profile, 313 "mode": profile_mode 314 } 315 316 self.exec_cmd(["sudo", "systemctl", "start", service]) 317 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 318 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 319 320 def configure_cpu_governor(self): 321 self.log.info("Setting CPU governor to performance...") 322 323 # This assumes that there is the same CPU scaling governor on each CPU 324 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 325 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 326 327 def configure_irq_affinity(self): 328 self.log.info("Setting NIC irq affinity for NICs...") 329 330 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 331 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 332 for nic in nic_names: 333 irq_cmd = ["sudo", irq_script_path, nic] 334 self.log.info(irq_cmd) 335 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 336 337 def restore_services(self): 338 self.log.info("Restoring services...") 339 for service, state in self.svc_restore_dict.items(): 340 cmd = "stop" if state == "inactive" else "start" 341 self.exec_cmd(["sudo", "systemctl", cmd, service]) 342 343 def restore_sysctl(self): 344 self.log.info("Restoring sysctl settings...") 345 for opt, value in self.sysctl_restore_dict.items(): 346 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 347 348 def restore_tuned(self): 349 self.log.info("Restoring tuned-adm settings...") 350 351 if not self.tuned_restore_dict: 352 return 353 354 if self.tuned_restore_dict["mode"] == "auto": 355 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 356 self.log.info("Reverted tuned-adm to auto_profile.") 357 else: 358 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 359 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 360 361 def restore_governor(self): 362 self.log.info("Restoring CPU governor setting...") 363 if self.governor_restore: 364 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 365 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 366 367 368class Target(Server): 369 def __init__(self, name, general_config, target_config): 370 super().__init__(name, general_config, target_config) 371 372 # Defaults 373 self.enable_sar = False 374 self.sar_delay = 0 375 self.sar_interval = 0 376 self.sar_count = 0 377 self.enable_pcm = False 378 self.pcm_dir = "" 379 self.pcm_delay = 0 380 self.pcm_interval = 0 381 self.pcm_count = 0 382 self.enable_bandwidth = 0 383 self.bandwidth_count = 0 384 self.enable_dpdk_memory = False 385 self.dpdk_wait_time = 0 386 self.enable_zcopy = False 387 self.scheduler_name = "static" 388 self.null_block = 0 389 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 390 self.subsystem_info_list = [] 391 self.initiator_info = [] 392 self.enable_pm = False 393 self.pm_delay = 0 394 self.pm_interval = 0 395 self.pm_count = 1 396 397 if "null_block_devices" in target_config: 398 self.null_block = target_config["null_block_devices"] 399 if "sar_settings" in target_config: 400 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 401 if "pcm_settings" in target_config: 402 self.enable_pcm = True 403 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 404 if "enable_bandwidth" in target_config: 405 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 406 if "enable_dpdk_memory" in target_config: 407 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 408 if "scheduler_settings" in target_config: 409 self.scheduler_name = target_config["scheduler_settings"] 410 if "zcopy_settings" in target_config: 411 self.enable_zcopy = target_config["zcopy_settings"] 412 if "results_dir" in target_config: 413 self.results_dir = target_config["results_dir"] 414 if "pm_settings" in target_config: 415 self.enable_pm, self.pm_delay, self.pm_interval, self.pm_count = target_config["pm_settings"] 416 # Normalize pm_count - <= 0 means to loop indefinitely so let's avoid that to not block forever 417 self.pm_count = self.pm_count if self.pm_count > 0 else 1 418 419 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 420 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 421 self.set_local_nic_info(self.set_local_nic_info_helper()) 422 423 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 424 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 425 426 self.configure_system() 427 if self.enable_adq: 428 self.configure_adq() 429 self.sys_config() 430 431 def set_local_nic_info_helper(self): 432 return json.loads(self.exec_cmd(["lshw", "-json"])) 433 434 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 435 stderr_opt = None 436 if stderr_redirect: 437 stderr_opt = subprocess.STDOUT 438 if change_dir: 439 old_cwd = os.getcwd() 440 os.chdir(change_dir) 441 self.log.info("Changing directory to %s" % change_dir) 442 443 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 444 445 if change_dir: 446 os.chdir(old_cwd) 447 self.log.info("Changing directory to %s" % old_cwd) 448 return out 449 450 def zip_spdk_sources(self, spdk_dir, dest_file): 451 self.log.info("Zipping SPDK source directory") 452 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 453 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 454 for file in files: 455 fh.write(os.path.relpath(os.path.join(root, file))) 456 fh.close() 457 self.log.info("Done zipping") 458 459 @staticmethod 460 def _chunks(input_list, chunks_no): 461 div, rem = divmod(len(input_list), chunks_no) 462 for i in range(chunks_no): 463 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 464 yield input_list[si:si + (div + 1 if i < rem else div)] 465 466 def spread_bdevs(self, req_disks): 467 # Spread available block devices indexes: 468 # - evenly across available initiator systems 469 # - evenly across available NIC interfaces for 470 # each initiator 471 # Not NUMA aware. 472 ip_bdev_map = [] 473 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 474 475 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 476 self.initiator_info[i]["bdev_range"] = init_chunk 477 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 478 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 479 for c in nic_chunk: 480 ip_bdev_map.append((ip, c)) 481 return ip_bdev_map 482 483 def measure_sar(self, results_dir, sar_file_prefix): 484 cpu_number = os.cpu_count() 485 sar_idle_sum = 0 486 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 487 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 488 489 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 490 time.sleep(self.sar_delay) 491 self.log.info("Starting SAR measurements") 492 493 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 494 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 495 for line in out.split("\n"): 496 if "Average" in line: 497 if "CPU" in line: 498 self.log.info("Summary CPU utilization from SAR:") 499 self.log.info(line) 500 elif "all" in line: 501 self.log.info(line) 502 else: 503 sar_idle_sum += float(line.split()[7]) 504 fh.write(out) 505 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 506 507 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 508 f.write("%0.2f" % sar_cpu_usage) 509 510 def measure_power(self, results_dir, prefix, script_full_dir): 511 time.sleep(self.pm_delay) 512 self.log_print("Starting power measurements") 513 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 514 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 515 "-x", "-c", "%s" % self.pm_count, "-t", "%s" % self.pm_interval, "-r"]) 516 517 def ethtool_after_fio_ramp(self, fio_ramp_time): 518 time.sleep(fio_ramp_time//2) 519 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 520 for nic in nic_names: 521 self.log.info(nic) 522 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 523 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 524 525 def measure_pcm_memory(self, results_dir, pcm_file_name): 526 time.sleep(self.pcm_delay) 527 cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 528 pcm_memory = subprocess.Popen(cmd) 529 time.sleep(self.pcm_count) 530 pcm_memory.terminate() 531 532 def measure_pcm(self, results_dir, pcm_file_name): 533 time.sleep(self.pcm_delay) 534 cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, 535 "-csv=%s/%s" % (results_dir, pcm_file_name)] 536 subprocess.run(cmd) 537 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 538 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 539 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 540 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 541 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 542 543 def measure_pcm_power(self, results_dir, pcm_power_file_name): 544 time.sleep(self.pcm_delay) 545 out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 546 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 547 fh.write(out) 548 549 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 550 self.log.info("INFO: starting network bandwidth measure") 551 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 552 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 553 554 def measure_dpdk_memory(self, results_dir): 555 self.log.info("INFO: waiting to generate DPDK memory usage") 556 time.sleep(self.dpdk_wait_time) 557 self.log.info("INFO: generating DPDK memory usage") 558 rpc.env.env_dpdk_get_mem_stats 559 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 560 561 def sys_config(self): 562 self.log.info("====Kernel release:====") 563 self.log.info(os.uname().release) 564 self.log.info("====Kernel command line:====") 565 with open('/proc/cmdline') as f: 566 cmdline = f.readlines() 567 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 568 self.log.info("====sysctl conf:====") 569 with open('/etc/sysctl.conf') as f: 570 sysctl = f.readlines() 571 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 572 self.log.info("====Cpu power info:====") 573 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 574 self.log.info("====zcopy settings:====") 575 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 576 self.log.info("====Scheduler settings:====") 577 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 578 579 580class Initiator(Server): 581 def __init__(self, name, general_config, initiator_config): 582 super().__init__(name, general_config, initiator_config) 583 584 # Required fields 585 self.ip = initiator_config["ip"] 586 self.target_nic_ips = initiator_config["target_nic_ips"] 587 588 # Defaults 589 self.cpus_allowed = None 590 self.cpus_allowed_policy = "shared" 591 self.spdk_dir = "/tmp/spdk" 592 self.fio_bin = "/usr/src/fio/fio" 593 self.nvmecli_bin = "nvme" 594 self.cpu_frequency = None 595 self.subsystem_info_list = [] 596 597 if "spdk_dir" in initiator_config: 598 self.spdk_dir = initiator_config["spdk_dir"] 599 if "fio_bin" in initiator_config: 600 self.fio_bin = initiator_config["fio_bin"] 601 if "nvmecli_bin" in initiator_config: 602 self.nvmecli_bin = initiator_config["nvmecli_bin"] 603 if "cpus_allowed" in initiator_config: 604 self.cpus_allowed = initiator_config["cpus_allowed"] 605 if "cpus_allowed_policy" in initiator_config: 606 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 607 if "cpu_frequency" in initiator_config: 608 self.cpu_frequency = initiator_config["cpu_frequency"] 609 if os.getenv('SPDK_WORKSPACE'): 610 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 611 612 self.ssh_connection = paramiko.SSHClient() 613 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 614 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 615 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 616 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 617 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 618 619 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 620 self.copy_spdk("/tmp/spdk.zip") 621 self.set_local_nic_info(self.set_local_nic_info_helper()) 622 self.set_cpu_frequency() 623 self.configure_system() 624 if self.enable_adq: 625 self.configure_adq() 626 self.sys_config() 627 628 def set_local_nic_info_helper(self): 629 return json.loads(self.exec_cmd(["lshw", "-json"])) 630 631 def stop(self): 632 self.ssh_connection.close() 633 634 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 635 if change_dir: 636 cmd = ["cd", change_dir, ";", *cmd] 637 638 # In case one of the command elements contains whitespace and is not 639 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 640 # when sending to remote system. 641 for i, c in enumerate(cmd): 642 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 643 cmd[i] = '"%s"' % c 644 cmd = " ".join(cmd) 645 646 # Redirect stderr to stdout thanks using get_pty option if needed 647 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 648 out = stdout.read().decode(encoding="utf-8") 649 650 # Check the return code 651 rc = stdout.channel.recv_exit_status() 652 if rc: 653 raise CalledProcessError(int(rc), cmd, out) 654 655 return out 656 657 def put_file(self, local, remote_dest): 658 ftp = self.ssh_connection.open_sftp() 659 ftp.put(local, remote_dest) 660 ftp.close() 661 662 def get_file(self, remote, local_dest): 663 ftp = self.ssh_connection.open_sftp() 664 ftp.get(remote, local_dest) 665 ftp.close() 666 667 def copy_spdk(self, local_spdk_zip): 668 self.log.info("Copying SPDK sources to initiator %s" % self.name) 669 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 670 self.log.info("Copied sources zip from target") 671 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 672 self.log.info("Sources unpacked") 673 674 def copy_result_files(self, dest_dir): 675 self.log.info("Copying results") 676 677 if not os.path.exists(dest_dir): 678 os.mkdir(dest_dir) 679 680 # Get list of result files from initiator and copy them back to target 681 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 682 683 for file in file_list: 684 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 685 os.path.join(dest_dir, file)) 686 self.log.info("Done copying results") 687 688 def match_subsystems(self, target_subsytems): 689 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 690 subsystems.sort(key=lambda x: x[1]) 691 self.log.info("Found matching subsystems on target side:") 692 for s in subsystems: 693 self.log.info(s) 694 self.subsystem_info_list = subsystems 695 696 def gen_fio_filename_conf(self, *args, **kwargs): 697 # Logic implemented in SPDKInitiator and KernelInitiator classes 698 pass 699 700 def get_route_nic_numa(self, remote_nvme_ip): 701 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 702 local_nvme_nic = local_nvme_nic[0]["dev"] 703 return self.get_nic_numa_node(local_nvme_nic) 704 705 @staticmethod 706 def gen_fio_offset_section(offset_inc, num_jobs): 707 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 708 return "\n".join(["size=%s%%" % offset_inc, 709 "offset=0%", 710 "offset_increment=%s%%" % offset_inc]) 711 712 def gen_fio_numa_section(self, fio_filenames_list): 713 numa_stats = {} 714 for nvme in fio_filenames_list: 715 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 716 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 717 718 # Use the most common NUMA node for this chunk to allocate memory and CPUs 719 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 720 721 return "\n".join(["numa_cpu_nodes=%s" % section_local_numa, 722 "numa_mem_policy=prefer:%s" % section_local_numa]) 723 724 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 725 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 726 offset=False, offset_inc=0): 727 fio_conf_template = """ 728[global] 729ioengine={ioengine} 730{spdk_conf} 731thread=1 732group_reporting=1 733direct=1 734percentile_list=50:90:99:99.5:99.9:99.99:99.999 735 736norandommap=1 737rw={rw} 738rwmixread={rwmixread} 739bs={block_size} 740time_based=1 741ramp_time={ramp_time} 742runtime={run_time} 743rate_iops={rate_iops} 744""" 745 if "spdk" in self.mode: 746 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 747 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 748 else: 749 ioengine = self.ioengine 750 spdk_conf = "" 751 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 752 "|", "awk", "'{print $1}'"]) 753 subsystems = [x for x in out.split("\n") if "nvme" in x] 754 755 if self.cpus_allowed is not None: 756 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 757 cpus_num = 0 758 cpus = self.cpus_allowed.split(",") 759 for cpu in cpus: 760 if "-" in cpu: 761 a, b = cpu.split("-") 762 a = int(a) 763 b = int(b) 764 cpus_num += len(range(a, b)) 765 else: 766 cpus_num += 1 767 self.num_cores = cpus_num 768 threads = range(0, self.num_cores) 769 elif hasattr(self, 'num_cores'): 770 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 771 threads = range(0, int(self.num_cores)) 772 else: 773 self.num_cores = len(subsystems) 774 threads = range(0, len(subsystems)) 775 776 if "spdk" in self.mode: 777 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 778 offset, offset_inc) 779 else: 780 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs, 781 offset, offset_inc) 782 783 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 784 rw=rw, rwmixread=rwmixread, block_size=block_size, 785 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 786 787 # TODO: hipri disabled for now, as it causes fio errors: 788 # io_u error on file /dev/nvme2n1: Operation not supported 789 # See comment in KernelInitiator class, init_connect() function 790 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 791 fio_config = fio_config + """ 792fixedbufs=1 793registerfiles=1 794#hipri=1 795""" 796 if num_jobs: 797 fio_config = fio_config + "numjobs=%s \n" % num_jobs 798 if self.cpus_allowed is not None: 799 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 800 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 801 fio_config = fio_config + filename_section 802 803 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 804 if hasattr(self, "num_cores"): 805 fio_config_filename += "_%sCPU" % self.num_cores 806 fio_config_filename += ".fio" 807 808 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 809 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 810 self.log.info("Created FIO Config:") 811 self.log.info(fio_config) 812 813 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 814 815 def set_cpu_frequency(self): 816 if self.cpu_frequency is not None: 817 try: 818 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 819 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 820 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 821 except Exception: 822 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 823 sys.exit() 824 else: 825 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 826 827 def run_fio(self, fio_config_file, run_num=None): 828 job_name, _ = os.path.splitext(fio_config_file) 829 self.log.info("Starting FIO run for job: %s" % job_name) 830 self.log.info("Using FIO: %s" % self.fio_bin) 831 832 if run_num: 833 for i in range(1, run_num + 1): 834 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 835 try: 836 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 837 "--output=%s" % output_filename, "--eta=never"], True) 838 self.log.info(output) 839 except CalledProcessError as e: 840 self.log.error("ERROR: Fio process failed!") 841 self.log.info(e.stdout) 842 else: 843 output_filename = job_name + "_" + self.name + ".json" 844 output = self.exec_cmd(["sudo", self.fio_bin, 845 fio_config_file, "--output-format=json", 846 "--output" % output_filename], True) 847 self.log.info(output) 848 self.log.info("FIO run finished. Results in: %s" % output_filename) 849 850 def sys_config(self): 851 self.log.info("====Kernel release:====") 852 self.log.info(self.exec_cmd(["uname", "-r"])) 853 self.log.info("====Kernel command line:====") 854 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 855 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 856 self.log.info("====sysctl conf:====") 857 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 858 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 859 self.log.info("====Cpu power info:====") 860 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 861 862 863class KernelTarget(Target): 864 def __init__(self, name, general_config, target_config): 865 super().__init__(name, general_config, target_config) 866 # Defaults 867 self.nvmet_bin = "nvmetcli" 868 869 if "nvmet_bin" in target_config: 870 self.nvmet_bin = target_config["nvmet_bin"] 871 872 def stop(self): 873 self.nvmet_command(self.nvmet_bin, "clear") 874 875 def get_nvme_devices(self): 876 output = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]) 877 output = [x for x in output.split("\n") if "nvme" in x] 878 return output 879 880 def nvmet_command(self, nvmet_bin, command): 881 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 882 883 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 884 885 nvmet_cfg = { 886 "ports": [], 887 "hosts": [], 888 "subsystems": [], 889 } 890 891 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 892 port = str(4420 + bdev_num) 893 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 894 serial = "SPDK00%s" % bdev_num 895 bdev_name = nvme_list[bdev_num] 896 897 nvmet_cfg["subsystems"].append({ 898 "allowed_hosts": [], 899 "attr": { 900 "allow_any_host": "1", 901 "serial": serial, 902 "version": "1.3" 903 }, 904 "namespaces": [ 905 { 906 "device": { 907 "path": bdev_name, 908 "uuid": "%s" % uuid.uuid4() 909 }, 910 "enable": 1, 911 "nsid": port 912 } 913 ], 914 "nqn": nqn 915 }) 916 917 nvmet_cfg["ports"].append({ 918 "addr": { 919 "adrfam": "ipv4", 920 "traddr": ip, 921 "trsvcid": port, 922 "trtype": self.transport 923 }, 924 "portid": bdev_num, 925 "referrals": [], 926 "subsystems": [nqn] 927 }) 928 929 self.subsystem_info_list.append((port, nqn, ip)) 930 self.subsys_no = len(self.subsystem_info_list) 931 932 with open("kernel.conf", "w") as fh: 933 fh.write(json.dumps(nvmet_cfg, indent=2)) 934 935 def tgt_start(self): 936 self.log.info("Configuring kernel NVMeOF Target") 937 938 if self.null_block: 939 self.log.info("Configuring with null block device.") 940 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 941 else: 942 self.log.info("Configuring with NVMe drives.") 943 nvme_list = self.get_nvme_devices() 944 945 self.kernel_tgt_gen_subsystem_conf(nvme_list) 946 self.subsys_no = len(nvme_list) 947 948 self.nvmet_command(self.nvmet_bin, "clear") 949 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 950 951 if self.enable_adq: 952 self.adq_configure_tc() 953 954 self.log.info("Done configuring kernel NVMeOF Target") 955 956 957class SPDKTarget(Target): 958 def __init__(self, name, general_config, target_config): 959 super().__init__(name, general_config, target_config) 960 961 # Required fields 962 self.core_mask = target_config["core_mask"] 963 self.num_cores = self.get_num_cores(self.core_mask) 964 965 # Defaults 966 self.dif_insert_strip = False 967 self.null_block_dif_type = 0 968 self.num_shared_buffers = 4096 969 self.max_queue_depth = 128 970 self.bpf_proc = None 971 self.bpf_scripts = [] 972 self.enable_dsa = False 973 self.scheduler_core_limit = None 974 975 if "num_shared_buffers" in target_config: 976 self.num_shared_buffers = target_config["num_shared_buffers"] 977 if "max_queue_depth" in target_config: 978 self.max_queue_depth = target_config["max_queue_depth"] 979 if "null_block_dif_type" in target_config: 980 self.null_block_dif_type = target_config["null_block_dif_type"] 981 if "dif_insert_strip" in target_config: 982 self.dif_insert_strip = target_config["dif_insert_strip"] 983 if "bpf_scripts" in target_config: 984 self.bpf_scripts = target_config["bpf_scripts"] 985 if "dsa_settings" in target_config: 986 self.enable_dsa = target_config["dsa_settings"] 987 if "scheduler_core_limit" in target_config: 988 self.scheduler_core_limit = target_config["scheduler_core_limit"] 989 990 self.log.info("====DSA settings:====") 991 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 992 993 def get_nvme_devices_count(self): 994 return len(self.get_nvme_devices()) 995 996 def get_nvme_devices(self): 997 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 998 bdev_bdfs = [bdev["params"]["traddr"] for bdev in bdev_subsys_json_obj["config"]] 999 return bdev_bdfs 1000 1001 @staticmethod 1002 def get_num_cores(core_mask): 1003 if "0x" in core_mask: 1004 return bin(int(core_mask, 16)).count("1") 1005 else: 1006 num_cores = 0 1007 core_mask = core_mask.replace("[", "") 1008 core_mask = core_mask.replace("]", "") 1009 for i in core_mask.split(","): 1010 if "-" in i: 1011 x, y = i.split("-") 1012 num_cores += len(range(int(x), int(y))) + 1 1013 else: 1014 num_cores += 1 1015 return num_cores 1016 1017 def spdk_tgt_configure(self): 1018 self.log.info("Configuring SPDK NVMeOF target via RPC") 1019 1020 if self.enable_adq: 1021 self.adq_configure_tc() 1022 1023 # Create transport layer 1024 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1025 num_shared_buffers=self.num_shared_buffers, 1026 max_queue_depth=self.max_queue_depth, 1027 dif_insert_or_strip=self.dif_insert_strip, 1028 sock_priority=self.adq_priority) 1029 self.log.info("SPDK NVMeOF transport layer:") 1030 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1031 1032 if self.null_block: 1033 self.spdk_tgt_add_nullblock(self.null_block) 1034 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1035 else: 1036 self.spdk_tgt_add_nvme_conf() 1037 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1038 1039 self.log.info("Done configuring SPDK NVMeOF Target") 1040 1041 def spdk_tgt_add_nullblock(self, null_block_count): 1042 md_size = 0 1043 block_size = 4096 1044 if self.null_block_dif_type != 0: 1045 md_size = 128 1046 1047 self.log.info("Adding null block bdevices to config via RPC") 1048 for i in range(null_block_count): 1049 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1050 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1051 dif_type=self.null_block_dif_type, md_size=md_size) 1052 self.log.info("SPDK Bdevs configuration:") 1053 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1054 1055 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1056 self.log.info("Adding NVMe bdevs to config via RPC") 1057 1058 bdfs = self.get_nvme_devices() 1059 bdfs = [b.replace(":", ".") for b in bdfs] 1060 1061 if req_num_disks: 1062 if req_num_disks > len(bdfs): 1063 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1064 sys.exit(1) 1065 else: 1066 bdfs = bdfs[0:req_num_disks] 1067 1068 for i, bdf in enumerate(bdfs): 1069 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1070 1071 self.log.info("SPDK Bdevs configuration:") 1072 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1073 1074 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1075 self.log.info("Adding subsystems to config") 1076 if not req_num_disks: 1077 req_num_disks = self.get_nvme_devices_count() 1078 1079 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1080 port = str(4420 + bdev_num) 1081 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1082 serial = "SPDK00%s" % bdev_num 1083 bdev_name = "Nvme%sn1" % bdev_num 1084 1085 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1086 allow_any_host=True, max_namespaces=8) 1087 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1088 for nqn_name in [nqn, "discovery"]: 1089 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1090 nqn=nqn_name, 1091 trtype=self.transport, 1092 traddr=ip, 1093 trsvcid=port, 1094 adrfam="ipv4") 1095 self.subsystem_info_list.append((port, nqn, ip)) 1096 self.subsys_no = len(self.subsystem_info_list) 1097 1098 self.log.info("SPDK NVMeOF subsystem configuration:") 1099 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1100 1101 def bpf_start(self): 1102 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1103 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1104 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1105 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1106 1107 with open(self.pid, "r") as fh: 1108 nvmf_pid = str(fh.readline()) 1109 1110 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1111 self.log.info(cmd) 1112 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1113 1114 def tgt_start(self): 1115 if self.null_block: 1116 self.subsys_no = 1 1117 else: 1118 self.subsys_no = self.get_nvme_devices_count() 1119 self.log.info("Starting SPDK NVMeOF Target process") 1120 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1121 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1122 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1123 1124 with open(self.pid, "w") as fh: 1125 fh.write(str(proc.pid)) 1126 self.nvmf_proc = proc 1127 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1128 self.log.info("Waiting for spdk to initialize...") 1129 while True: 1130 if os.path.exists("/var/tmp/spdk.sock"): 1131 break 1132 time.sleep(1) 1133 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1134 1135 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1136 1137 if self.enable_zcopy: 1138 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1139 enable_zerocopy_send_server=True) 1140 self.log.info("Target socket options:") 1141 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1142 1143 if self.enable_adq: 1144 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1145 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1146 nvme_adminq_poll_period_us=100000, retry_count=4) 1147 1148 if self.enable_dsa: 1149 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1150 self.log.info("Target DSA accel module enabled") 1151 1152 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1153 rpc.framework_start_init(self.client) 1154 1155 if self.bpf_scripts: 1156 self.bpf_start() 1157 1158 self.spdk_tgt_configure() 1159 1160 def stop(self): 1161 if self.bpf_proc: 1162 self.log.info("Stopping BPF Trace script") 1163 self.bpf_proc.terminate() 1164 self.bpf_proc.wait() 1165 1166 if hasattr(self, "nvmf_proc"): 1167 try: 1168 self.nvmf_proc.terminate() 1169 self.nvmf_proc.wait(timeout=30) 1170 except Exception as e: 1171 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1172 self.log.info(e) 1173 self.nvmf_proc.kill() 1174 self.nvmf_proc.communicate() 1175 # Try to clean up RPC socket files if they were not removed 1176 # because of using 'kill' 1177 try: 1178 os.remove("/var/tmp/spdk.sock") 1179 os.remove("/var/tmp/spdk.sock.lock") 1180 except FileNotFoundError: 1181 pass 1182 1183 1184class KernelInitiator(Initiator): 1185 def __init__(self, name, general_config, initiator_config): 1186 super().__init__(name, general_config, initiator_config) 1187 1188 # Defaults 1189 self.extra_params = "" 1190 self.ioengine = "libaio" 1191 1192 if "extra_params" in initiator_config: 1193 self.extra_params = initiator_config["extra_params"] 1194 1195 if "kernel_engine" in initiator_config: 1196 self.ioengine = initiator_config["kernel_engine"] 1197 if "io_uring" in self.ioengine: 1198 self.extra_params = "--nr-poll-queues=8" 1199 1200 def get_connected_nvme_list(self): 1201 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1202 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1203 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1204 return nvme_list 1205 1206 def init_connect(self): 1207 self.log.info("Below connection attempts may result in error messages, this is expected!") 1208 for subsystem in self.subsystem_info_list: 1209 self.log.info("Trying to connect %s %s %s" % subsystem) 1210 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1211 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1212 time.sleep(2) 1213 1214 if "io_uring" in self.ioengine: 1215 self.log.info("Setting block layer settings for io_uring.") 1216 1217 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1218 # apparently it's not possible for connected subsystems. 1219 # Results in "error: Invalid argument" 1220 block_sysfs_settings = { 1221 "iostats": "0", 1222 "rq_affinity": "0", 1223 "nomerges": "2" 1224 } 1225 1226 for disk in self.get_connected_nvme_list(): 1227 sysfs = os.path.join("/sys/block", disk, "queue") 1228 for k, v in block_sysfs_settings.items(): 1229 sysfs_opt_path = os.path.join(sysfs, k) 1230 try: 1231 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1232 except CalledProcessError as e: 1233 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1234 finally: 1235 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1236 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1237 1238 def init_disconnect(self): 1239 for subsystem in self.subsystem_info_list: 1240 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1241 time.sleep(1) 1242 1243 def get_nvme_subsystem_numa(self, dev_name): 1244 # Remove two last characters to get controller name instead of subsystem name 1245 nvme_ctrl = os.path.basename(dev_name)[:-2] 1246 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1247 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1248 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1249 1250 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1251 # Generate connected nvme devices names and sort them by used NIC numa node 1252 # to allow better grouping when splitting into fio sections. 1253 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1254 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1255 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1256 1257 filename_section = "" 1258 nvme_per_split = int(len(nvme_list) / len(threads)) 1259 remainder = len(nvme_list) % len(threads) 1260 iterator = iter(nvme_list) 1261 result = [] 1262 for i in range(len(threads)): 1263 result.append([]) 1264 for _ in range(nvme_per_split): 1265 result[i].append(next(iterator)) 1266 if remainder: 1267 result[i].append(next(iterator)) 1268 remainder -= 1 1269 for i, r in enumerate(result): 1270 header = "[filename%s]" % i 1271 disks = "\n".join(["filename=%s" % x for x in r]) 1272 job_section_qd = round((io_depth * len(r)) / num_jobs) 1273 if job_section_qd == 0: 1274 job_section_qd = 1 1275 iodepth = "iodepth=%s" % job_section_qd 1276 1277 offset_section = "" 1278 if offset: 1279 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1280 1281 numa_opts = self.gen_fio_numa_section(r) 1282 1283 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1284 1285 return filename_section 1286 1287 1288class SPDKInitiator(Initiator): 1289 def __init__(self, name, general_config, initiator_config): 1290 super().__init__(name, general_config, initiator_config) 1291 1292 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1293 self.install_spdk() 1294 1295 # Required fields 1296 self.num_cores = initiator_config["num_cores"] 1297 1298 # Optional fields 1299 self.enable_data_digest = False 1300 if "enable_data_digest" in initiator_config: 1301 self.enable_data_digest = initiator_config["enable_data_digest"] 1302 1303 def install_spdk(self): 1304 self.log.info("Using fio binary %s" % self.fio_bin) 1305 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1306 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1307 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1308 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1309 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1310 1311 self.log.info("SPDK built") 1312 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1313 1314 def init_connect(self): 1315 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1316 # bdev plugin initiates connection just before starting IO traffic. 1317 # This is just to have a "init_connect" equivalent of the same function 1318 # from KernelInitiator class. 1319 # Just prepare bdev.conf JSON file for later use and consider it 1320 # "making a connection". 1321 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1322 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1323 1324 def init_disconnect(self): 1325 # SPDK Initiator does not need to explicity disconnect as this gets done 1326 # after fio bdev plugin finishes IO. 1327 pass 1328 1329 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1330 bdev_cfg_section = { 1331 "subsystems": [ 1332 { 1333 "subsystem": "bdev", 1334 "config": [] 1335 } 1336 ] 1337 } 1338 1339 for i, subsys in enumerate(remote_subsystem_list): 1340 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1341 nvme_ctrl = { 1342 "method": "bdev_nvme_attach_controller", 1343 "params": { 1344 "name": "Nvme{}".format(i), 1345 "trtype": self.transport, 1346 "traddr": sub_addr, 1347 "trsvcid": sub_port, 1348 "subnqn": sub_nqn, 1349 "adrfam": "IPv4" 1350 } 1351 } 1352 1353 if self.enable_adq: 1354 nvme_ctrl["params"].update({"priority": "1"}) 1355 1356 if self.enable_data_digest: 1357 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1358 1359 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1360 1361 return json.dumps(bdev_cfg_section, indent=2) 1362 1363 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1364 filename_section = "" 1365 if len(threads) >= len(subsystems): 1366 threads = range(0, len(subsystems)) 1367 1368 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1369 # to allow better grouping when splitting into fio sections. 1370 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1371 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1372 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1373 1374 nvme_per_split = int(len(subsystems) / len(threads)) 1375 remainder = len(subsystems) % len(threads) 1376 iterator = iter(filenames) 1377 result = [] 1378 for i in range(len(threads)): 1379 result.append([]) 1380 for _ in range(nvme_per_split): 1381 result[i].append(next(iterator)) 1382 if remainder: 1383 result[i].append(next(iterator)) 1384 remainder -= 1 1385 for i, r in enumerate(result): 1386 header = "[filename%s]" % i 1387 disks = "\n".join(["filename=%s" % x for x in r]) 1388 job_section_qd = round((io_depth * len(r)) / num_jobs) 1389 if job_section_qd == 0: 1390 job_section_qd = 1 1391 iodepth = "iodepth=%s" % job_section_qd 1392 1393 offset_section = "" 1394 if offset: 1395 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1396 1397 numa_opts = self.gen_fio_numa_section(r) 1398 1399 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1400 1401 return filename_section 1402 1403 def get_nvme_subsystem_numa(self, bdev_name): 1404 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1405 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1406 1407 # Remove two last characters to get controller name instead of subsystem name 1408 nvme_ctrl = bdev_name[:-2] 1409 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1410 return self.get_route_nic_numa(remote_nvme_ip) 1411 1412 1413if __name__ == "__main__": 1414 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1415 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1416 1417 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1418 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1419 help='Configuration file.') 1420 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1421 help='Results directory.') 1422 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1423 help='CSV results filename.') 1424 1425 args = parser.parse_args() 1426 1427 logging.basicConfig(level=logging.INFO, 1428 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1429 1430 logging.info("Using config file: %s" % args.config) 1431 with open(args.config, "r") as config: 1432 data = json.load(config) 1433 1434 initiators = [] 1435 fio_cases = [] 1436 1437 general_config = data["general"] 1438 target_config = data["target"] 1439 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1440 1441 for k, v in data.items(): 1442 if "target" in k: 1443 v.update({"results_dir": args.results}) 1444 if data[k]["mode"] == "spdk": 1445 target_obj = SPDKTarget(k, data["general"], v) 1446 elif data[k]["mode"] == "kernel": 1447 target_obj = KernelTarget(k, data["general"], v) 1448 elif "initiator" in k: 1449 if data[k]["mode"] == "spdk": 1450 init_obj = SPDKInitiator(k, data["general"], v) 1451 elif data[k]["mode"] == "kernel": 1452 init_obj = KernelInitiator(k, data["general"], v) 1453 initiators.append(init_obj) 1454 elif "fio" in k: 1455 fio_workloads = itertools.product(data[k]["bs"], 1456 data[k]["qd"], 1457 data[k]["rw"]) 1458 1459 fio_run_time = data[k]["run_time"] 1460 fio_ramp_time = data[k]["ramp_time"] 1461 fio_rw_mix_read = data[k]["rwmixread"] 1462 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1463 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1464 1465 fio_rate_iops = 0 1466 if "rate_iops" in data[k]: 1467 fio_rate_iops = data[k]["rate_iops"] 1468 1469 fio_offset = False 1470 if "offset" in data[k]: 1471 fio_offset = data[k]["offset"] 1472 fio_offset_inc = 0 1473 if "offset_inc" in data[k]: 1474 fio_offset_inc = data[k]["offset_inc"] 1475 else: 1476 continue 1477 1478 try: 1479 os.mkdir(args.results) 1480 except FileExistsError: 1481 pass 1482 1483 for i in initiators: 1484 target_obj.initiator_info.append( 1485 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1486 ) 1487 1488 # TODO: This try block is definietly too large. Need to break this up into separate 1489 # logical blocks to reduce size. 1490 try: 1491 target_obj.tgt_start() 1492 1493 for i in initiators: 1494 i.match_subsystems(target_obj.subsystem_info_list) 1495 if i.enable_adq: 1496 i.adq_configure_tc() 1497 1498 # Poor mans threading 1499 # Run FIO tests 1500 for block_size, io_depth, rw in fio_workloads: 1501 threads = [] 1502 configs = [] 1503 power_daemon = None 1504 for i in initiators: 1505 i.init_connect() 1506 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1507 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1508 fio_offset, fio_offset_inc) 1509 configs.append(cfg) 1510 1511 for i, cfg in zip(initiators, configs): 1512 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1513 threads.append(t) 1514 if target_obj.enable_sar: 1515 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1516 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1517 threads.append(t) 1518 1519 if target_obj.enable_pcm: 1520 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1521 1522 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1523 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1524 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1525 1526 threads.append(pcm_cpu_t) 1527 threads.append(pcm_mem_t) 1528 threads.append(pcm_pow_t) 1529 1530 if target_obj.enable_bandwidth: 1531 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1532 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1533 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1534 threads.append(t) 1535 1536 if target_obj.enable_dpdk_memory: 1537 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1538 threads.append(t) 1539 1540 if target_obj.enable_adq: 1541 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1542 threads.append(ethtool_thread) 1543 1544 if target_obj.enable_pm: 1545 power_daemon = threading.Thread(target=target_obj.measure_power, 1546 args=(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir)) 1547 threads.append(power_daemon) 1548 1549 for t in threads: 1550 t.start() 1551 for t in threads: 1552 t.join() 1553 1554 for i in initiators: 1555 i.init_disconnect() 1556 i.copy_result_files(args.results) 1557 1558 target_obj.restore_governor() 1559 target_obj.restore_tuned() 1560 target_obj.restore_services() 1561 target_obj.restore_sysctl() 1562 if target_obj.enable_adq: 1563 target_obj.reload_driver("ice") 1564 for i in initiators: 1565 i.restore_governor() 1566 i.restore_tuned() 1567 i.restore_services() 1568 i.restore_sysctl() 1569 if i.enable_adq: 1570 i.reload_driver("ice") 1571 parse_results(args.results, args.csv_filename) 1572 finally: 1573 for i in initiators: 1574 try: 1575 i.stop() 1576 except Exception as err: 1577 pass 1578 target_obj.stop() 1579