1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24 25sys.path.append(os.path.dirname(__file__) + '/../../../python') 26 27import spdk.rpc as rpc # noqa 28import spdk.rpc.client as rpc_client # noqa 29 30 31class Server: 32 def __init__(self, name, general_config, server_config): 33 self.name = name 34 self.username = general_config["username"] 35 self.password = general_config["password"] 36 self.transport = general_config["transport"].lower() 37 self.nic_ips = server_config["nic_ips"] 38 self.mode = server_config["mode"] 39 40 self.log = logging.getLogger(self.name) 41 42 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 43 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 44 self.irq_scripts_dir = server_config["irq_scripts_dir"] 45 46 self.local_nic_info = [] 47 self._nics_json_obj = {} 48 self.svc_restore_dict = {} 49 self.sysctl_restore_dict = {} 50 self.tuned_restore_dict = {} 51 self.governor_restore = "" 52 self.tuned_profile = "" 53 54 self.enable_adq = False 55 self.adq_priority = None 56 if "adq_enable" in server_config and server_config["adq_enable"]: 57 self.enable_adq = server_config["adq_enable"] 58 self.adq_priority = 1 59 60 if "tuned_profile" in server_config: 61 self.tuned_profile = server_config["tuned_profile"] 62 63 if not re.match("^[A-Za-z0-9]*$", name): 64 self.log.info("Please use a name which contains only letters or numbers") 65 sys.exit(1) 66 67 @staticmethod 68 def get_uncommented_lines(lines): 69 return [line for line in lines if line and not line.startswith('#')] 70 71 def get_nic_name_by_ip(self, ip): 72 if not self._nics_json_obj: 73 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 74 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 75 for nic in self._nics_json_obj: 76 for addr in nic["addr_info"]: 77 if ip in addr["local"]: 78 return nic["ifname"] 79 80 def set_local_nic_info_helper(self): 81 pass 82 83 def set_local_nic_info(self, pci_info): 84 def extract_network_elements(json_obj): 85 nic_list = [] 86 if isinstance(json_obj, list): 87 for x in json_obj: 88 nic_list.extend(extract_network_elements(x)) 89 elif isinstance(json_obj, dict): 90 if "children" in json_obj: 91 nic_list.extend(extract_network_elements(json_obj["children"])) 92 if "class" in json_obj.keys() and "network" in json_obj["class"]: 93 nic_list.append(json_obj) 94 return nic_list 95 96 self.local_nic_info = extract_network_elements(pci_info) 97 98 def get_nic_numa_node(self, nic_name): 99 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 100 101 def get_numa_cpu_map(self): 102 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 103 numa_cpu_json_map = {} 104 105 for cpu in numa_cpu_json_obj["cpus"]: 106 cpu_num = int(cpu["cpu"]) 107 numa_node = int(cpu["node"]) 108 numa_cpu_json_map.setdefault(numa_node, []) 109 numa_cpu_json_map[numa_node].append(cpu_num) 110 111 return numa_cpu_json_map 112 113 # pylint: disable=R0201 114 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 115 return "" 116 117 def configure_system(self): 118 self.load_drivers() 119 self.configure_services() 120 self.configure_sysctl() 121 self.configure_tuned() 122 self.configure_cpu_governor() 123 self.configure_irq_affinity() 124 125 def load_drivers(self): 126 self.log.info("Loading drivers") 127 self.exec_cmd(["sudo", "modprobe", "-a", 128 "nvme-%s" % self.transport, 129 "nvmet-%s" % self.transport]) 130 if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block: 131 self.exec_cmd(["sudo", "modprobe", "null_blk", 132 "nr_devices=%s" % self.null_block]) 133 134 def configure_adq(self): 135 if self.mode == "kernel": 136 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 137 return 138 self.adq_load_modules() 139 self.adq_configure_nic() 140 141 def adq_load_modules(self): 142 self.log.info("Modprobing ADQ-related Linux modules...") 143 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 144 for module in adq_module_deps: 145 try: 146 self.exec_cmd(["sudo", "modprobe", module]) 147 self.log.info("%s loaded!" % module) 148 except CalledProcessError as e: 149 self.log.error("ERROR: failed to load module %s" % module) 150 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 151 152 def adq_configure_tc(self): 153 self.log.info("Configuring ADQ Traffic classes and filters...") 154 155 if self.mode == "kernel": 156 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 157 return 158 159 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 160 num_queues_tc1 = self.num_cores 161 port_param = "dst_port" if isinstance(self, Target) else "src_port" 162 port = "4420" 163 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 164 165 for nic_ip in self.nic_ips: 166 nic_name = self.get_nic_name_by_ip(nic_ip) 167 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 168 "root", "mqprio", "num_tc", "2", "map", "0", "1", 169 "queues", "%s@0" % num_queues_tc0, 170 "%s@%s" % (num_queues_tc1, num_queues_tc0), 171 "hw", "1", "mode", "channel"] 172 self.log.info(" ".join(tc_qdisc_map_cmd)) 173 self.exec_cmd(tc_qdisc_map_cmd) 174 175 time.sleep(5) 176 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 177 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 178 self.exec_cmd(tc_qdisc_ingress_cmd) 179 180 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 181 "protocol", "ip", "ingress", "prio", "1", "flower", 182 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 183 "skip_sw", "hw_tc", "1"] 184 self.log.info(" ".join(tc_filter_cmd)) 185 self.exec_cmd(tc_filter_cmd) 186 187 # show tc configuration 188 self.log.info("Show tc configuration for %s NIC..." % nic_name) 189 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 190 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 191 self.log.info("%s" % tc_disk_out) 192 self.log.info("%s" % tc_filter_out) 193 194 # Ethtool coalesce settings must be applied after configuring traffic classes 195 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 196 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 197 198 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 199 xps_cmd = ["sudo", xps_script_path, nic_name] 200 self.log.info(xps_cmd) 201 self.exec_cmd(xps_cmd) 202 203 def reload_driver(self, driver): 204 try: 205 self.exec_cmd(["sudo", "rmmod", driver]) 206 self.exec_cmd(["sudo", "modprobe", driver]) 207 except CalledProcessError as e: 208 self.log.error("ERROR: failed to reload %s module!" % driver) 209 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 210 211 def adq_configure_nic(self): 212 self.log.info("Configuring NIC port settings for ADQ testing...") 213 214 # Reload the driver first, to make sure any previous settings are re-set. 215 self.reload_driver("ice") 216 217 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 218 for nic in nic_names: 219 self.log.info(nic) 220 try: 221 self.exec_cmd(["sudo", "ethtool", "-K", nic, 222 "hw-tc-offload", "on"]) # Enable hardware TC offload 223 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 224 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 225 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 226 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 227 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 228 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 229 "channel-pkt-inspect-optimize", "on"]) 230 except CalledProcessError as e: 231 self.log.error("ERROR: failed to configure NIC port using ethtool!") 232 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 233 self.log.info("Please update your NIC driver and firmware versions and try again.") 234 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 235 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 236 237 def configure_services(self): 238 self.log.info("Configuring active services...") 239 svc_config = configparser.ConfigParser(strict=False) 240 241 # Below list is valid only for RHEL / Fedora systems and might not 242 # contain valid names for other distributions. 243 svc_target_state = { 244 "firewalld": "inactive", 245 "irqbalance": "inactive", 246 "lldpad.service": "inactive", 247 "lldpad.socket": "inactive" 248 } 249 250 for service in svc_target_state: 251 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 252 out = "\n".join(["[%s]" % service, out]) 253 svc_config.read_string(out) 254 255 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 256 continue 257 258 service_state = svc_config[service]["ActiveState"] 259 self.log.info("Current state of %s service is %s" % (service, service_state)) 260 self.svc_restore_dict.update({service: service_state}) 261 if service_state != "inactive": 262 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 263 self.exec_cmd(["sudo", "systemctl", "stop", service]) 264 265 def configure_sysctl(self): 266 self.log.info("Tuning sysctl settings...") 267 268 busy_read = 0 269 if self.enable_adq and self.mode == "spdk": 270 busy_read = 1 271 272 sysctl_opts = { 273 "net.core.busy_poll": 0, 274 "net.core.busy_read": busy_read, 275 "net.core.somaxconn": 4096, 276 "net.core.netdev_max_backlog": 8192, 277 "net.ipv4.tcp_max_syn_backlog": 16384, 278 "net.core.rmem_max": 268435456, 279 "net.core.wmem_max": 268435456, 280 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 281 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 282 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 283 "net.ipv4.route.flush": 1, 284 "vm.overcommit_memory": 1, 285 } 286 287 for opt, value in sysctl_opts.items(): 288 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 289 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 290 291 def configure_tuned(self): 292 if not self.tuned_profile: 293 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 294 return 295 296 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 297 service = "tuned" 298 tuned_config = configparser.ConfigParser(strict=False) 299 300 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 301 out = "\n".join(["[%s]" % service, out]) 302 tuned_config.read_string(out) 303 tuned_state = tuned_config[service]["ActiveState"] 304 self.svc_restore_dict.update({service: tuned_state}) 305 306 if tuned_state != "inactive": 307 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 308 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 309 310 self.tuned_restore_dict = { 311 "profile": profile, 312 "mode": profile_mode 313 } 314 315 self.exec_cmd(["sudo", "systemctl", "start", service]) 316 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 317 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 318 319 def configure_cpu_governor(self): 320 self.log.info("Setting CPU governor to performance...") 321 322 # This assumes that there is the same CPU scaling governor on each CPU 323 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 324 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 325 326 def configure_irq_affinity(self): 327 self.log.info("Setting NIC irq affinity for NICs...") 328 329 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 330 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 331 for nic in nic_names: 332 irq_cmd = ["sudo", irq_script_path, nic] 333 self.log.info(irq_cmd) 334 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 335 336 def restore_services(self): 337 self.log.info("Restoring services...") 338 for service, state in self.svc_restore_dict.items(): 339 cmd = "stop" if state == "inactive" else "start" 340 self.exec_cmd(["sudo", "systemctl", cmd, service]) 341 342 def restore_sysctl(self): 343 self.log.info("Restoring sysctl settings...") 344 for opt, value in self.sysctl_restore_dict.items(): 345 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 346 347 def restore_tuned(self): 348 self.log.info("Restoring tuned-adm settings...") 349 350 if not self.tuned_restore_dict: 351 return 352 353 if self.tuned_restore_dict["mode"] == "auto": 354 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 355 self.log.info("Reverted tuned-adm to auto_profile.") 356 else: 357 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 358 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 359 360 def restore_governor(self): 361 self.log.info("Restoring CPU governor setting...") 362 if self.governor_restore: 363 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 364 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 365 366 367class Target(Server): 368 def __init__(self, name, general_config, target_config): 369 super().__init__(name, general_config, target_config) 370 371 # Defaults 372 self.enable_sar = False 373 self.sar_delay = 0 374 self.sar_interval = 0 375 self.sar_count = 0 376 self.enable_pcm = False 377 self.pcm_dir = "" 378 self.pcm_delay = 0 379 self.pcm_interval = 0 380 self.pcm_count = 0 381 self.enable_bandwidth = 0 382 self.bandwidth_count = 0 383 self.enable_dpdk_memory = False 384 self.dpdk_wait_time = 0 385 self.enable_zcopy = False 386 self.scheduler_name = "static" 387 self.null_block = 0 388 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 389 self.subsystem_info_list = [] 390 self.initiator_info = [] 391 self.enable_pm = False 392 self.pm_delay = 0 393 self.pm_interval = 0 394 self.pm_count = 1 395 396 if "null_block_devices" in target_config: 397 self.null_block = target_config["null_block_devices"] 398 if "sar_settings" in target_config: 399 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"] 400 if "pcm_settings" in target_config: 401 self.enable_pcm = True 402 self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"] 403 if "enable_bandwidth" in target_config: 404 self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"] 405 if "enable_dpdk_memory" in target_config: 406 self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"] 407 if "scheduler_settings" in target_config: 408 self.scheduler_name = target_config["scheduler_settings"] 409 if "zcopy_settings" in target_config: 410 self.enable_zcopy = target_config["zcopy_settings"] 411 if "results_dir" in target_config: 412 self.results_dir = target_config["results_dir"] 413 if "pm_settings" in target_config: 414 self.enable_pm, self.pm_delay, self.pm_interval, self.pm_count = target_config["pm_settings"] 415 # Normalize pm_count - <= 0 means to loop indefinitely so let's avoid that to not block forever 416 self.pm_count = self.pm_count if self.pm_count > 0 else 1 417 418 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 419 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 420 self.set_local_nic_info(self.set_local_nic_info_helper()) 421 422 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 423 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 424 425 self.configure_system() 426 if self.enable_adq: 427 self.configure_adq() 428 self.sys_config() 429 430 def set_local_nic_info_helper(self): 431 return json.loads(self.exec_cmd(["lshw", "-json"])) 432 433 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 434 stderr_opt = None 435 if stderr_redirect: 436 stderr_opt = subprocess.STDOUT 437 if change_dir: 438 old_cwd = os.getcwd() 439 os.chdir(change_dir) 440 self.log.info("Changing directory to %s" % change_dir) 441 442 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 443 444 if change_dir: 445 os.chdir(old_cwd) 446 self.log.info("Changing directory to %s" % old_cwd) 447 return out 448 449 def zip_spdk_sources(self, spdk_dir, dest_file): 450 self.log.info("Zipping SPDK source directory") 451 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 452 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 453 for file in files: 454 fh.write(os.path.relpath(os.path.join(root, file))) 455 fh.close() 456 self.log.info("Done zipping") 457 458 @staticmethod 459 def _chunks(input_list, chunks_no): 460 div, rem = divmod(len(input_list), chunks_no) 461 for i in range(chunks_no): 462 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 463 yield input_list[si:si + (div + 1 if i < rem else div)] 464 465 def spread_bdevs(self, req_disks): 466 # Spread available block devices indexes: 467 # - evenly across available initiator systems 468 # - evenly across available NIC interfaces for 469 # each initiator 470 # Not NUMA aware. 471 ip_bdev_map = [] 472 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 473 474 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 475 self.initiator_info[i]["bdev_range"] = init_chunk 476 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 477 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 478 for c in nic_chunk: 479 ip_bdev_map.append((ip, c)) 480 return ip_bdev_map 481 482 def measure_sar(self, results_dir, sar_file_prefix): 483 cpu_number = os.cpu_count() 484 sar_idle_sum = 0 485 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 486 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 487 488 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay) 489 time.sleep(self.sar_delay) 490 self.log.info("Starting SAR measurements") 491 492 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count]) 493 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 494 for line in out.split("\n"): 495 if "Average" in line: 496 if "CPU" in line: 497 self.log.info("Summary CPU utilization from SAR:") 498 self.log.info(line) 499 elif "all" in line: 500 self.log.info(line) 501 else: 502 sar_idle_sum += float(line.split()[7]) 503 fh.write(out) 504 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 505 506 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 507 f.write("%0.2f" % sar_cpu_usage) 508 509 def measure_power(self, results_dir, prefix, script_full_dir): 510 time.sleep(self.pm_delay) 511 self.log_print("Starting power measurements") 512 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 513 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 514 "-x", "-c", "%s" % self.pm_count, "-t", "%s" % self.pm_interval, "-r"]) 515 516 def ethtool_after_fio_ramp(self, fio_ramp_time): 517 time.sleep(fio_ramp_time//2) 518 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 519 for nic in nic_names: 520 self.log.info(nic) 521 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 522 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 523 524 def measure_pcm_memory(self, results_dir, pcm_file_name): 525 time.sleep(self.pcm_delay) 526 cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)] 527 pcm_memory = subprocess.Popen(cmd) 528 time.sleep(self.pcm_count) 529 pcm_memory.terminate() 530 531 def measure_pcm(self, results_dir, pcm_file_name): 532 time.sleep(self.pcm_delay) 533 cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, 534 "-csv=%s/%s" % (results_dir, pcm_file_name)] 535 subprocess.run(cmd) 536 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 537 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 538 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 539 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 540 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 541 542 def measure_pcm_power(self, results_dir, pcm_power_file_name): 543 time.sleep(self.pcm_delay) 544 out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count]) 545 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 546 fh.write(out) 547 548 def measure_network_bandwidth(self, results_dir, bandwidth_file_name): 549 self.log.info("INFO: starting network bandwidth measure") 550 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 551 "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)]) 552 553 def measure_dpdk_memory(self, results_dir): 554 self.log.info("INFO: waiting to generate DPDK memory usage") 555 time.sleep(self.dpdk_wait_time) 556 self.log.info("INFO: generating DPDK memory usage") 557 rpc.env.env_dpdk_get_mem_stats 558 os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir)) 559 560 def sys_config(self): 561 self.log.info("====Kernel release:====") 562 self.log.info(os.uname().release) 563 self.log.info("====Kernel command line:====") 564 with open('/proc/cmdline') as f: 565 cmdline = f.readlines() 566 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 567 self.log.info("====sysctl conf:====") 568 with open('/etc/sysctl.conf') as f: 569 sysctl = f.readlines() 570 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 571 self.log.info("====Cpu power info:====") 572 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 573 self.log.info("====zcopy settings:====") 574 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 575 self.log.info("====Scheduler settings:====") 576 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 577 578 579class Initiator(Server): 580 def __init__(self, name, general_config, initiator_config): 581 super().__init__(name, general_config, initiator_config) 582 583 # Required fields 584 self.ip = initiator_config["ip"] 585 self.target_nic_ips = initiator_config["target_nic_ips"] 586 587 # Defaults 588 self.cpus_allowed = None 589 self.cpus_allowed_policy = "shared" 590 self.spdk_dir = "/tmp/spdk" 591 self.fio_bin = "/usr/src/fio/fio" 592 self.nvmecli_bin = "nvme" 593 self.cpu_frequency = None 594 self.subsystem_info_list = [] 595 596 if "spdk_dir" in initiator_config: 597 self.spdk_dir = initiator_config["spdk_dir"] 598 if "fio_bin" in initiator_config: 599 self.fio_bin = initiator_config["fio_bin"] 600 if "nvmecli_bin" in initiator_config: 601 self.nvmecli_bin = initiator_config["nvmecli_bin"] 602 if "cpus_allowed" in initiator_config: 603 self.cpus_allowed = initiator_config["cpus_allowed"] 604 if "cpus_allowed_policy" in initiator_config: 605 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 606 if "cpu_frequency" in initiator_config: 607 self.cpu_frequency = initiator_config["cpu_frequency"] 608 if os.getenv('SPDK_WORKSPACE'): 609 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 610 611 self.ssh_connection = paramiko.SSHClient() 612 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 613 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 614 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 615 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 616 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 617 618 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 619 self.copy_spdk("/tmp/spdk.zip") 620 self.set_local_nic_info(self.set_local_nic_info_helper()) 621 self.set_cpu_frequency() 622 self.configure_system() 623 if self.enable_adq: 624 self.configure_adq() 625 self.sys_config() 626 627 def set_local_nic_info_helper(self): 628 return json.loads(self.exec_cmd(["lshw", "-json"])) 629 630 def stop(self): 631 self.ssh_connection.close() 632 633 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 634 if change_dir: 635 cmd = ["cd", change_dir, ";", *cmd] 636 637 # In case one of the command elements contains whitespace and is not 638 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 639 # when sending to remote system. 640 for i, c in enumerate(cmd): 641 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 642 cmd[i] = '"%s"' % c 643 cmd = " ".join(cmd) 644 645 # Redirect stderr to stdout thanks using get_pty option if needed 646 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 647 out = stdout.read().decode(encoding="utf-8") 648 649 # Check the return code 650 rc = stdout.channel.recv_exit_status() 651 if rc: 652 raise CalledProcessError(int(rc), cmd, out) 653 654 return out 655 656 def put_file(self, local, remote_dest): 657 ftp = self.ssh_connection.open_sftp() 658 ftp.put(local, remote_dest) 659 ftp.close() 660 661 def get_file(self, remote, local_dest): 662 ftp = self.ssh_connection.open_sftp() 663 ftp.get(remote, local_dest) 664 ftp.close() 665 666 def copy_spdk(self, local_spdk_zip): 667 self.log.info("Copying SPDK sources to initiator %s" % self.name) 668 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 669 self.log.info("Copied sources zip from target") 670 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 671 self.log.info("Sources unpacked") 672 673 def copy_result_files(self, dest_dir): 674 self.log.info("Copying results") 675 676 if not os.path.exists(dest_dir): 677 os.mkdir(dest_dir) 678 679 # Get list of result files from initiator and copy them back to target 680 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 681 682 for file in file_list: 683 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 684 os.path.join(dest_dir, file)) 685 self.log.info("Done copying results") 686 687 def match_subsystems(self, target_subsytems): 688 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 689 subsystems.sort(key=lambda x: x[1]) 690 self.log.info("Found matching subsystems on target side:") 691 for s in subsystems: 692 self.log.info(s) 693 self.subsystem_info_list = subsystems 694 695 def gen_fio_filename_conf(self, *args, **kwargs): 696 # Logic implemented in SPDKInitiator and KernelInitiator classes 697 pass 698 699 def get_route_nic_numa(self, remote_nvme_ip): 700 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 701 local_nvme_nic = local_nvme_nic[0]["dev"] 702 return self.get_nic_numa_node(local_nvme_nic) 703 704 @staticmethod 705 def gen_fio_offset_section(offset_inc, num_jobs): 706 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 707 return "\n".join(["size=%s%%" % offset_inc, 708 "offset=0%", 709 "offset_increment=%s%%" % offset_inc]) 710 711 def gen_fio_numa_section(self, fio_filenames_list): 712 numa_stats = {} 713 for nvme in fio_filenames_list: 714 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 715 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 716 717 # Use the most common NUMA node for this chunk to allocate memory and CPUs 718 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 719 720 return "\n".join(["numa_cpu_nodes=%s" % section_local_numa, 721 "numa_mem_policy=prefer:%s" % section_local_numa]) 722 723 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 724 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 725 offset=False, offset_inc=0): 726 fio_conf_template = """ 727[global] 728ioengine={ioengine} 729{spdk_conf} 730thread=1 731group_reporting=1 732direct=1 733percentile_list=50:90:99:99.5:99.9:99.99:99.999 734 735norandommap=1 736rw={rw} 737rwmixread={rwmixread} 738bs={block_size} 739time_based=1 740ramp_time={ramp_time} 741runtime={run_time} 742rate_iops={rate_iops} 743""" 744 if "spdk" in self.mode: 745 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 746 spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 747 else: 748 ioengine = self.ioengine 749 spdk_conf = "" 750 out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'", 751 "|", "awk", "'{print $1}'"]) 752 subsystems = [x for x in out.split("\n") if "nvme" in x] 753 754 if self.cpus_allowed is not None: 755 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 756 cpus_num = 0 757 cpus = self.cpus_allowed.split(",") 758 for cpu in cpus: 759 if "-" in cpu: 760 a, b = cpu.split("-") 761 a = int(a) 762 b = int(b) 763 cpus_num += len(range(a, b)) 764 else: 765 cpus_num += 1 766 self.num_cores = cpus_num 767 threads = range(0, self.num_cores) 768 elif hasattr(self, 'num_cores'): 769 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 770 threads = range(0, int(self.num_cores)) 771 else: 772 self.num_cores = len(subsystems) 773 threads = range(0, len(subsystems)) 774 775 if "spdk" in self.mode: 776 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 777 offset, offset_inc) 778 else: 779 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs, 780 offset, offset_inc) 781 782 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf, 783 rw=rw, rwmixread=rwmixread, block_size=block_size, 784 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 785 786 # TODO: hipri disabled for now, as it causes fio errors: 787 # io_u error on file /dev/nvme2n1: Operation not supported 788 # See comment in KernelInitiator class, init_connect() function 789 if hasattr(self, "ioengine") and "io_uring" in self.ioengine: 790 fio_config = fio_config + """ 791fixedbufs=1 792registerfiles=1 793#hipri=1 794""" 795 if num_jobs: 796 fio_config = fio_config + "numjobs=%s \n" % num_jobs 797 if self.cpus_allowed is not None: 798 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 799 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 800 fio_config = fio_config + filename_section 801 802 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 803 if hasattr(self, "num_cores"): 804 fio_config_filename += "_%sCPU" % self.num_cores 805 fio_config_filename += ".fio" 806 807 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 808 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 809 self.log.info("Created FIO Config:") 810 self.log.info(fio_config) 811 812 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 813 814 def set_cpu_frequency(self): 815 if self.cpu_frequency is not None: 816 try: 817 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 818 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 819 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 820 except Exception: 821 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 822 sys.exit() 823 else: 824 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 825 826 def run_fio(self, fio_config_file, run_num=None): 827 job_name, _ = os.path.splitext(fio_config_file) 828 self.log.info("Starting FIO run for job: %s" % job_name) 829 self.log.info("Using FIO: %s" % self.fio_bin) 830 831 if run_num: 832 for i in range(1, run_num + 1): 833 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json" 834 try: 835 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 836 "--output=%s" % output_filename, "--eta=never"], True) 837 self.log.info(output) 838 except subprocess.CalledProcessError as e: 839 self.log.error("ERROR: Fio process failed!") 840 self.log.info(e.stdout) 841 else: 842 output_filename = job_name + "_" + self.name + ".json" 843 output = self.exec_cmd(["sudo", self.fio_bin, 844 fio_config_file, "--output-format=json", 845 "--output" % output_filename], True) 846 self.log.info(output) 847 self.log.info("FIO run finished. Results in: %s" % output_filename) 848 849 def sys_config(self): 850 self.log.info("====Kernel release:====") 851 self.log.info(self.exec_cmd(["uname", "-r"])) 852 self.log.info("====Kernel command line:====") 853 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 854 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 855 self.log.info("====sysctl conf:====") 856 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 857 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 858 self.log.info("====Cpu power info:====") 859 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 860 861 862class KernelTarget(Target): 863 def __init__(self, name, general_config, target_config): 864 super().__init__(name, general_config, target_config) 865 # Defaults 866 self.nvmet_bin = "nvmetcli" 867 868 if "nvmet_bin" in target_config: 869 self.nvmet_bin = target_config["nvmet_bin"] 870 871 def stop(self): 872 self.nvmet_command(self.nvmet_bin, "clear") 873 874 def get_nvme_devices(self): 875 output = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]) 876 output = [x for x in output.split("\n") if "nvme" in x] 877 return output 878 879 def nvmet_command(self, nvmet_bin, command): 880 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 881 882 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 883 884 nvmet_cfg = { 885 "ports": [], 886 "hosts": [], 887 "subsystems": [], 888 } 889 890 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 891 port = str(4420 + bdev_num) 892 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 893 serial = "SPDK00%s" % bdev_num 894 bdev_name = nvme_list[bdev_num] 895 896 nvmet_cfg["subsystems"].append({ 897 "allowed_hosts": [], 898 "attr": { 899 "allow_any_host": "1", 900 "serial": serial, 901 "version": "1.3" 902 }, 903 "namespaces": [ 904 { 905 "device": { 906 "path": bdev_name, 907 "uuid": "%s" % uuid.uuid4() 908 }, 909 "enable": 1, 910 "nsid": port 911 } 912 ], 913 "nqn": nqn 914 }) 915 916 nvmet_cfg["ports"].append({ 917 "addr": { 918 "adrfam": "ipv4", 919 "traddr": ip, 920 "trsvcid": port, 921 "trtype": self.transport 922 }, 923 "portid": bdev_num, 924 "referrals": [], 925 "subsystems": [nqn] 926 }) 927 928 self.subsystem_info_list.append((port, nqn, ip)) 929 self.subsys_no = len(self.subsystem_info_list) 930 931 with open("kernel.conf", "w") as fh: 932 fh.write(json.dumps(nvmet_cfg, indent=2)) 933 934 def tgt_start(self): 935 self.log.info("Configuring kernel NVMeOF Target") 936 937 if self.null_block: 938 self.log.info("Configuring with null block device.") 939 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 940 else: 941 self.log.info("Configuring with NVMe drives.") 942 nvme_list = self.get_nvme_devices() 943 944 self.kernel_tgt_gen_subsystem_conf(nvme_list) 945 self.subsys_no = len(nvme_list) 946 947 self.nvmet_command(self.nvmet_bin, "clear") 948 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 949 950 if self.enable_adq: 951 self.adq_configure_tc() 952 953 self.log.info("Done configuring kernel NVMeOF Target") 954 955 956class SPDKTarget(Target): 957 def __init__(self, name, general_config, target_config): 958 super().__init__(name, general_config, target_config) 959 960 # Required fields 961 self.core_mask = target_config["core_mask"] 962 self.num_cores = self.get_num_cores(self.core_mask) 963 964 # Defaults 965 self.dif_insert_strip = False 966 self.null_block_dif_type = 0 967 self.num_shared_buffers = 4096 968 self.max_queue_depth = 128 969 self.bpf_proc = None 970 self.bpf_scripts = [] 971 self.enable_dsa = False 972 self.scheduler_core_limit = None 973 974 if "num_shared_buffers" in target_config: 975 self.num_shared_buffers = target_config["num_shared_buffers"] 976 if "max_queue_depth" in target_config: 977 self.max_queue_depth = target_config["max_queue_depth"] 978 if "null_block_dif_type" in target_config: 979 self.null_block_dif_type = target_config["null_block_dif_type"] 980 if "dif_insert_strip" in target_config: 981 self.dif_insert_strip = target_config["dif_insert_strip"] 982 if "bpf_scripts" in target_config: 983 self.bpf_scripts = target_config["bpf_scripts"] 984 if "dsa_settings" in target_config: 985 self.enable_dsa = target_config["dsa_settings"] 986 if "scheduler_core_limit" in target_config: 987 self.scheduler_core_limit = target_config["scheduler_core_limit"] 988 989 self.log.info("====DSA settings:====") 990 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 991 992 def get_nvme_devices_count(self): 993 return len(self.get_nvme_devices()) 994 995 def get_nvme_devices(self): 996 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 997 bdev_bdfs = [bdev["params"]["traddr"] for bdev in bdev_subsys_json_obj["config"]] 998 return bdev_bdfs 999 1000 @staticmethod 1001 def get_num_cores(core_mask): 1002 if "0x" in core_mask: 1003 return bin(int(core_mask, 16)).count("1") 1004 else: 1005 num_cores = 0 1006 core_mask = core_mask.replace("[", "") 1007 core_mask = core_mask.replace("]", "") 1008 for i in core_mask.split(","): 1009 if "-" in i: 1010 x, y = i.split("-") 1011 num_cores += len(range(int(x), int(y))) + 1 1012 else: 1013 num_cores += 1 1014 return num_cores 1015 1016 def spdk_tgt_configure(self): 1017 self.log.info("Configuring SPDK NVMeOF target via RPC") 1018 1019 if self.enable_adq: 1020 self.adq_configure_tc() 1021 1022 # Create transport layer 1023 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1024 num_shared_buffers=self.num_shared_buffers, 1025 max_queue_depth=self.max_queue_depth, 1026 dif_insert_or_strip=self.dif_insert_strip, 1027 sock_priority=self.adq_priority) 1028 self.log.info("SPDK NVMeOF transport layer:") 1029 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1030 1031 if self.null_block: 1032 self.spdk_tgt_add_nullblock(self.null_block) 1033 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1034 else: 1035 self.spdk_tgt_add_nvme_conf() 1036 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1037 1038 self.log.info("Done configuring SPDK NVMeOF Target") 1039 1040 def spdk_tgt_add_nullblock(self, null_block_count): 1041 md_size = 0 1042 block_size = 4096 1043 if self.null_block_dif_type != 0: 1044 md_size = 128 1045 1046 self.log.info("Adding null block bdevices to config via RPC") 1047 for i in range(null_block_count): 1048 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1049 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1050 dif_type=self.null_block_dif_type, md_size=md_size) 1051 self.log.info("SPDK Bdevs configuration:") 1052 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1053 1054 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1055 self.log.info("Adding NVMe bdevs to config via RPC") 1056 1057 bdfs = self.get_nvme_devices() 1058 bdfs = [b.replace(":", ".") for b in bdfs] 1059 1060 if req_num_disks: 1061 if req_num_disks > len(bdfs): 1062 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1063 sys.exit(1) 1064 else: 1065 bdfs = bdfs[0:req_num_disks] 1066 1067 for i, bdf in enumerate(bdfs): 1068 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1069 1070 self.log.info("SPDK Bdevs configuration:") 1071 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1072 1073 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1074 self.log.info("Adding subsystems to config") 1075 if not req_num_disks: 1076 req_num_disks = self.get_nvme_devices_count() 1077 1078 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1079 port = str(4420 + bdev_num) 1080 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1081 serial = "SPDK00%s" % bdev_num 1082 bdev_name = "Nvme%sn1" % bdev_num 1083 1084 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1085 allow_any_host=True, max_namespaces=8) 1086 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1087 for nqn_name in [nqn, "discovery"]: 1088 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1089 nqn=nqn_name, 1090 trtype=self.transport, 1091 traddr=ip, 1092 trsvcid=port, 1093 adrfam="ipv4") 1094 self.subsystem_info_list.append((port, nqn, ip)) 1095 self.subsys_no = len(self.subsystem_info_list) 1096 1097 self.log.info("SPDK NVMeOF subsystem configuration:") 1098 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1099 1100 def bpf_start(self): 1101 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1102 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1103 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1104 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1105 1106 with open(self.pid, "r") as fh: 1107 nvmf_pid = str(fh.readline()) 1108 1109 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1110 self.log.info(cmd) 1111 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1112 1113 def tgt_start(self): 1114 if self.null_block: 1115 self.subsys_no = 1 1116 else: 1117 self.subsys_no = self.get_nvme_devices_count() 1118 self.log.info("Starting SPDK NVMeOF Target process") 1119 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1120 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1121 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1122 1123 with open(self.pid, "w") as fh: 1124 fh.write(str(proc.pid)) 1125 self.nvmf_proc = proc 1126 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1127 self.log.info("Waiting for spdk to initialize...") 1128 while True: 1129 if os.path.exists("/var/tmp/spdk.sock"): 1130 break 1131 time.sleep(1) 1132 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1133 1134 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1135 1136 if self.enable_zcopy: 1137 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1138 enable_zerocopy_send_server=True) 1139 self.log.info("Target socket options:") 1140 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1141 1142 if self.enable_adq: 1143 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1144 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1145 nvme_adminq_poll_period_us=100000, retry_count=4) 1146 1147 if self.enable_dsa: 1148 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1149 self.log.info("Target DSA accel module enabled") 1150 1151 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1152 rpc.framework_start_init(self.client) 1153 1154 if self.bpf_scripts: 1155 self.bpf_start() 1156 1157 self.spdk_tgt_configure() 1158 1159 def stop(self): 1160 if self.bpf_proc: 1161 self.log.info("Stopping BPF Trace script") 1162 self.bpf_proc.terminate() 1163 self.bpf_proc.wait() 1164 1165 if hasattr(self, "nvmf_proc"): 1166 try: 1167 self.nvmf_proc.terminate() 1168 self.nvmf_proc.wait(timeout=30) 1169 except Exception as e: 1170 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1171 self.log.info(e) 1172 self.nvmf_proc.kill() 1173 self.nvmf_proc.communicate() 1174 # Try to clean up RPC socket files if they were not removed 1175 # because of using 'kill' 1176 try: 1177 os.remove("/var/tmp/spdk.sock") 1178 os.remove("/var/tmp/spdk.sock.lock") 1179 except FileNotFoundError: 1180 pass 1181 1182 1183class KernelInitiator(Initiator): 1184 def __init__(self, name, general_config, initiator_config): 1185 super().__init__(name, general_config, initiator_config) 1186 1187 # Defaults 1188 self.extra_params = "" 1189 self.ioengine = "libaio" 1190 1191 if "extra_params" in initiator_config: 1192 self.extra_params = initiator_config["extra_params"] 1193 1194 if "kernel_engine" in initiator_config: 1195 self.ioengine = initiator_config["kernel_engine"] 1196 if "io_uring" in self.ioengine: 1197 self.extra_params = "--nr-poll-queues=8" 1198 1199 def get_connected_nvme_list(self): 1200 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1201 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1202 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1203 return nvme_list 1204 1205 def init_connect(self): 1206 self.log.info("Below connection attempts may result in error messages, this is expected!") 1207 for subsystem in self.subsystem_info_list: 1208 self.log.info("Trying to connect %s %s %s" % subsystem) 1209 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1210 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1211 time.sleep(2) 1212 1213 if "io_uring" in self.ioengine: 1214 self.log.info("Setting block layer settings for io_uring.") 1215 1216 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1217 # apparently it's not possible for connected subsystems. 1218 # Results in "error: Invalid argument" 1219 block_sysfs_settings = { 1220 "iostats": "0", 1221 "rq_affinity": "0", 1222 "nomerges": "2" 1223 } 1224 1225 for disk in self.get_connected_nvme_list(): 1226 sysfs = os.path.join("/sys/block", disk, "queue") 1227 for k, v in block_sysfs_settings.items(): 1228 sysfs_opt_path = os.path.join(sysfs, k) 1229 try: 1230 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1231 except subprocess.CalledProcessError as e: 1232 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1233 finally: 1234 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1235 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1236 1237 def init_disconnect(self): 1238 for subsystem in self.subsystem_info_list: 1239 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1240 time.sleep(1) 1241 1242 def get_nvme_subsystem_numa(self, dev_name): 1243 # Remove two last characters to get controller name instead of subsystem name 1244 nvme_ctrl = os.path.basename(dev_name)[:-2] 1245 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1246 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1247 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1248 1249 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1250 # Generate connected nvme devices names and sort them by used NIC numa node 1251 # to allow better grouping when splitting into fio sections. 1252 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1253 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1254 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1255 1256 filename_section = "" 1257 nvme_per_split = int(len(nvme_list) / len(threads)) 1258 remainder = len(nvme_list) % len(threads) 1259 iterator = iter(nvme_list) 1260 result = [] 1261 for i in range(len(threads)): 1262 result.append([]) 1263 for _ in range(nvme_per_split): 1264 result[i].append(next(iterator)) 1265 if remainder: 1266 result[i].append(next(iterator)) 1267 remainder -= 1 1268 for i, r in enumerate(result): 1269 header = "[filename%s]" % i 1270 disks = "\n".join(["filename=%s" % x for x in r]) 1271 job_section_qd = round((io_depth * len(r)) / num_jobs) 1272 if job_section_qd == 0: 1273 job_section_qd = 1 1274 iodepth = "iodepth=%s" % job_section_qd 1275 1276 offset_section = "" 1277 if offset: 1278 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1279 1280 numa_opts = self.gen_fio_numa_section(r) 1281 1282 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1283 1284 return filename_section 1285 1286 1287class SPDKInitiator(Initiator): 1288 def __init__(self, name, general_config, initiator_config): 1289 super().__init__(name, general_config, initiator_config) 1290 1291 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1292 self.install_spdk() 1293 1294 # Required fields 1295 self.num_cores = initiator_config["num_cores"] 1296 1297 # Optional fields 1298 self.enable_data_digest = False 1299 if "enable_data_digest" in initiator_config: 1300 self.enable_data_digest = initiator_config["enable_data_digest"] 1301 1302 def install_spdk(self): 1303 self.log.info("Using fio binary %s" % self.fio_bin) 1304 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1305 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1306 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1307 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1308 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1309 1310 self.log.info("SPDK built") 1311 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1312 1313 def init_connect(self): 1314 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1315 # bdev plugin initiates connection just before starting IO traffic. 1316 # This is just to have a "init_connect" equivalent of the same function 1317 # from KernelInitiator class. 1318 # Just prepare bdev.conf JSON file for later use and consider it 1319 # "making a connection". 1320 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1321 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1322 1323 def init_disconnect(self): 1324 # SPDK Initiator does not need to explicity disconnect as this gets done 1325 # after fio bdev plugin finishes IO. 1326 pass 1327 1328 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1329 bdev_cfg_section = { 1330 "subsystems": [ 1331 { 1332 "subsystem": "bdev", 1333 "config": [] 1334 } 1335 ] 1336 } 1337 1338 for i, subsys in enumerate(remote_subsystem_list): 1339 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1340 nvme_ctrl = { 1341 "method": "bdev_nvme_attach_controller", 1342 "params": { 1343 "name": "Nvme{}".format(i), 1344 "trtype": self.transport, 1345 "traddr": sub_addr, 1346 "trsvcid": sub_port, 1347 "subnqn": sub_nqn, 1348 "adrfam": "IPv4" 1349 } 1350 } 1351 1352 if self.enable_adq: 1353 nvme_ctrl["params"].update({"priority": "1"}) 1354 1355 if self.enable_data_digest: 1356 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1357 1358 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1359 1360 return json.dumps(bdev_cfg_section, indent=2) 1361 1362 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1363 filename_section = "" 1364 if len(threads) >= len(subsystems): 1365 threads = range(0, len(subsystems)) 1366 1367 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1368 # to allow better grouping when splitting into fio sections. 1369 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1370 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1371 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1372 1373 nvme_per_split = int(len(subsystems) / len(threads)) 1374 remainder = len(subsystems) % len(threads) 1375 iterator = iter(filenames) 1376 result = [] 1377 for i in range(len(threads)): 1378 result.append([]) 1379 for _ in range(nvme_per_split): 1380 result[i].append(next(iterator)) 1381 if remainder: 1382 result[i].append(next(iterator)) 1383 remainder -= 1 1384 for i, r in enumerate(result): 1385 header = "[filename%s]" % i 1386 disks = "\n".join(["filename=%s" % x for x in r]) 1387 job_section_qd = round((io_depth * len(r)) / num_jobs) 1388 if job_section_qd == 0: 1389 job_section_qd = 1 1390 iodepth = "iodepth=%s" % job_section_qd 1391 1392 offset_section = "" 1393 if offset: 1394 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1395 1396 numa_opts = self.gen_fio_numa_section(r) 1397 1398 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1399 1400 return filename_section 1401 1402 def get_nvme_subsystem_numa(self, bdev_name): 1403 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1404 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1405 1406 # Remove two last characters to get controller name instead of subsystem name 1407 nvme_ctrl = bdev_name[:-2] 1408 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1409 return self.get_route_nic_numa(remote_nvme_ip) 1410 1411 1412if __name__ == "__main__": 1413 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1414 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1415 1416 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1417 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1418 help='Configuration file.') 1419 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1420 help='Results directory.') 1421 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1422 help='CSV results filename.') 1423 1424 args = parser.parse_args() 1425 1426 logging.basicConfig(level=logging.INFO, 1427 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1428 1429 logging.info("Using config file: %s" % args.config) 1430 with open(args.config, "r") as config: 1431 data = json.load(config) 1432 1433 initiators = [] 1434 fio_cases = [] 1435 1436 general_config = data["general"] 1437 target_config = data["target"] 1438 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1439 1440 for k, v in data.items(): 1441 if "target" in k: 1442 v.update({"results_dir": args.results}) 1443 if data[k]["mode"] == "spdk": 1444 target_obj = SPDKTarget(k, data["general"], v) 1445 elif data[k]["mode"] == "kernel": 1446 target_obj = KernelTarget(k, data["general"], v) 1447 elif "initiator" in k: 1448 if data[k]["mode"] == "spdk": 1449 init_obj = SPDKInitiator(k, data["general"], v) 1450 elif data[k]["mode"] == "kernel": 1451 init_obj = KernelInitiator(k, data["general"], v) 1452 initiators.append(init_obj) 1453 elif "fio" in k: 1454 fio_workloads = itertools.product(data[k]["bs"], 1455 data[k]["qd"], 1456 data[k]["rw"]) 1457 1458 fio_run_time = data[k]["run_time"] 1459 fio_ramp_time = data[k]["ramp_time"] 1460 fio_rw_mix_read = data[k]["rwmixread"] 1461 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1462 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1463 1464 fio_rate_iops = 0 1465 if "rate_iops" in data[k]: 1466 fio_rate_iops = data[k]["rate_iops"] 1467 1468 fio_offset = False 1469 if "offset" in data[k]: 1470 fio_offset = data[k]["offset"] 1471 fio_offset_inc = 0 1472 if "offset_inc" in data[k]: 1473 fio_offset_inc = data[k]["offset_inc"] 1474 else: 1475 continue 1476 1477 try: 1478 os.mkdir(args.results) 1479 except FileExistsError: 1480 pass 1481 1482 for i in initiators: 1483 target_obj.initiator_info.append( 1484 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1485 ) 1486 1487 # TODO: This try block is definietly too large. Need to break this up into separate 1488 # logical blocks to reduce size. 1489 try: 1490 target_obj.tgt_start() 1491 1492 for i in initiators: 1493 i.match_subsystems(target_obj.subsystem_info_list) 1494 if i.enable_adq: 1495 i.adq_configure_tc() 1496 1497 # Poor mans threading 1498 # Run FIO tests 1499 for block_size, io_depth, rw in fio_workloads: 1500 threads = [] 1501 configs = [] 1502 power_daemon = None 1503 for i in initiators: 1504 i.init_connect() 1505 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1506 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1507 fio_offset, fio_offset_inc) 1508 configs.append(cfg) 1509 1510 for i, cfg in zip(initiators, configs): 1511 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num)) 1512 threads.append(t) 1513 if target_obj.enable_sar: 1514 sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth) 1515 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix)) 1516 threads.append(t) 1517 1518 if target_obj.enable_pcm: 1519 pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1520 1521 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],)) 1522 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],)) 1523 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],)) 1524 1525 threads.append(pcm_cpu_t) 1526 threads.append(pcm_mem_t) 1527 threads.append(pcm_pow_t) 1528 1529 if target_obj.enable_bandwidth: 1530 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)]) 1531 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"]) 1532 t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,)) 1533 threads.append(t) 1534 1535 if target_obj.enable_dpdk_memory: 1536 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results)) 1537 threads.append(t) 1538 1539 if target_obj.enable_adq: 1540 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1541 threads.append(ethtool_thread) 1542 1543 if target_obj.enable_pm: 1544 power_daemon = threading.Thread(target=target_obj.measure_power, 1545 args=(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir)) 1546 threads.append(power_daemon) 1547 1548 for t in threads: 1549 t.start() 1550 for t in threads: 1551 t.join() 1552 1553 for i in initiators: 1554 i.init_disconnect() 1555 i.copy_result_files(args.results) 1556 1557 target_obj.restore_governor() 1558 target_obj.restore_tuned() 1559 target_obj.restore_services() 1560 target_obj.restore_sysctl() 1561 if target_obj.enable_adq: 1562 target_obj.reload_driver("ice") 1563 for i in initiators: 1564 i.restore_governor() 1565 i.restore_tuned() 1566 i.restore_services() 1567 i.restore_sysctl() 1568 if i.enable_adq: 1569 i.reload_driver("ice") 1570 parse_results(args.results, args.csv_filename) 1571 finally: 1572 for i in initiators: 1573 try: 1574 i.stop() 1575 except Exception as err: 1576 pass 1577 target_obj.stop() 1578