1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7import os 8import re 9import sys 10import argparse 11import json 12import logging 13import zipfile 14import threading 15import subprocess 16import itertools 17import configparser 18import time 19import uuid 20 21import paramiko 22import pandas as pd 23from common import * 24from subprocess import CalledProcessError 25 26sys.path.append(os.path.dirname(__file__) + '/../../../python') 27 28import spdk.rpc as rpc # noqa 29import spdk.rpc.client as rpc_client # noqa 30 31 32class Server: 33 def __init__(self, name, general_config, server_config): 34 self.name = name 35 self.username = general_config["username"] 36 self.password = general_config["password"] 37 self.transport = general_config["transport"].lower() 38 self.nic_ips = server_config["nic_ips"] 39 self.mode = server_config["mode"] 40 41 self.log = logging.getLogger(self.name) 42 43 self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts" 44 if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]: 45 self.irq_scripts_dir = server_config["irq_scripts_dir"] 46 47 self.local_nic_info = [] 48 self._nics_json_obj = {} 49 self.svc_restore_dict = {} 50 self.sysctl_restore_dict = {} 51 self.tuned_restore_dict = {} 52 self.governor_restore = "" 53 self.tuned_profile = "" 54 55 self.enable_adq = False 56 self.adq_priority = None 57 if "adq_enable" in server_config and server_config["adq_enable"]: 58 self.enable_adq = server_config["adq_enable"] 59 self.adq_priority = 1 60 61 if "tuned_profile" in server_config: 62 self.tuned_profile = server_config["tuned_profile"] 63 64 if not re.match(r'^[A-Za-z0-9\-]+$', name): 65 self.log.info("Please use a name which contains only letters, numbers or dashes") 66 sys.exit(1) 67 68 @staticmethod 69 def get_uncommented_lines(lines): 70 return [line for line in lines if line and not line.startswith('#')] 71 72 def get_nic_name_by_ip(self, ip): 73 if not self._nics_json_obj: 74 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 75 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 76 for nic in self._nics_json_obj: 77 for addr in nic["addr_info"]: 78 if ip in addr["local"]: 79 return nic["ifname"] 80 81 def set_local_nic_info_helper(self): 82 pass 83 84 def set_local_nic_info(self, pci_info): 85 def extract_network_elements(json_obj): 86 nic_list = [] 87 if isinstance(json_obj, list): 88 for x in json_obj: 89 nic_list.extend(extract_network_elements(x)) 90 elif isinstance(json_obj, dict): 91 if "children" in json_obj: 92 nic_list.extend(extract_network_elements(json_obj["children"])) 93 if "class" in json_obj.keys() and "network" in json_obj["class"]: 94 nic_list.append(json_obj) 95 return nic_list 96 97 self.local_nic_info = extract_network_elements(pci_info) 98 99 def get_nic_numa_node(self, nic_name): 100 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 101 102 def get_numa_cpu_map(self): 103 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 104 numa_cpu_json_map = {} 105 106 for cpu in numa_cpu_json_obj["cpus"]: 107 cpu_num = int(cpu["cpu"]) 108 numa_node = int(cpu["node"]) 109 numa_cpu_json_map.setdefault(numa_node, []) 110 numa_cpu_json_map[numa_node].append(cpu_num) 111 112 return numa_cpu_json_map 113 114 # pylint: disable=R0201 115 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 116 return "" 117 118 def configure_system(self): 119 self.load_drivers() 120 self.configure_services() 121 self.configure_sysctl() 122 self.configure_tuned() 123 self.configure_cpu_governor() 124 self.configure_irq_affinity() 125 126 def load_drivers(self): 127 self.log.info("Loading drivers") 128 self.exec_cmd(["sudo", "modprobe", "-a", 129 "nvme-%s" % self.transport, 130 "nvmet-%s" % self.transport]) 131 132 def configure_adq(self): 133 self.adq_load_modules() 134 self.adq_configure_nic() 135 136 def adq_load_modules(self): 137 self.log.info("Modprobing ADQ-related Linux modules...") 138 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 139 for module in adq_module_deps: 140 try: 141 self.exec_cmd(["sudo", "modprobe", module]) 142 self.log.info("%s loaded!" % module) 143 except CalledProcessError as e: 144 self.log.error("ERROR: failed to load module %s" % module) 145 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 146 147 def adq_configure_tc(self): 148 self.log.info("Configuring ADQ Traffic classes and filters...") 149 150 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 151 num_queues_tc1 = self.num_cores 152 port_param = "dst_port" if isinstance(self, Target) else "src_port" 153 port = "4420" 154 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 155 156 for nic_ip in self.nic_ips: 157 nic_name = self.get_nic_name_by_ip(nic_ip) 158 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 159 "root", "mqprio", "num_tc", "2", "map", "0", "1", 160 "queues", "%s@0" % num_queues_tc0, 161 "%s@%s" % (num_queues_tc1, num_queues_tc0), 162 "hw", "1", "mode", "channel"] 163 self.log.info(" ".join(tc_qdisc_map_cmd)) 164 self.exec_cmd(tc_qdisc_map_cmd) 165 166 time.sleep(5) 167 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 168 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 169 self.exec_cmd(tc_qdisc_ingress_cmd) 170 171 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 172 "protocol", "ip", "ingress", "prio", "1", "flower", 173 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 174 "skip_sw", "hw_tc", "1"] 175 self.log.info(" ".join(tc_filter_cmd)) 176 self.exec_cmd(tc_filter_cmd) 177 178 # show tc configuration 179 self.log.info("Show tc configuration for %s NIC..." % nic_name) 180 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 181 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 182 self.log.info("%s" % tc_disk_out) 183 self.log.info("%s" % tc_filter_out) 184 185 # Ethtool coalesce settings must be applied after configuring traffic classes 186 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 187 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 188 189 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 190 xps_cmd = ["sudo", xps_script_path, nic_name] 191 self.log.info(xps_cmd) 192 self.exec_cmd(xps_cmd) 193 194 def reload_driver(self, driver): 195 try: 196 self.exec_cmd(["sudo", "rmmod", driver]) 197 self.exec_cmd(["sudo", "modprobe", driver]) 198 except CalledProcessError as e: 199 self.log.error("ERROR: failed to reload %s module!" % driver) 200 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 201 202 def adq_configure_nic(self): 203 self.log.info("Configuring NIC port settings for ADQ testing...") 204 205 # Reload the driver first, to make sure any previous settings are re-set. 206 self.reload_driver("ice") 207 208 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 209 for nic in nic_names: 210 self.log.info(nic) 211 try: 212 self.exec_cmd(["sudo", "ethtool", "-K", nic, 213 "hw-tc-offload", "on"]) # Enable hardware TC offload 214 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 215 "channel-inline-flow-director", "on"]) # Enable Intel Flow Director 216 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 217 # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment. 218 # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp(). 219 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 220 "channel-pkt-inspect-optimize", "on"]) 221 except CalledProcessError as e: 222 self.log.error("ERROR: failed to configure NIC port using ethtool!") 223 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 224 self.log.info("Please update your NIC driver and firmware versions and try again.") 225 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 226 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 227 228 def configure_services(self): 229 self.log.info("Configuring active services...") 230 svc_config = configparser.ConfigParser(strict=False) 231 232 # Below list is valid only for RHEL / Fedora systems and might not 233 # contain valid names for other distributions. 234 svc_target_state = { 235 "firewalld": "inactive", 236 "irqbalance": "inactive", 237 "lldpad.service": "inactive", 238 "lldpad.socket": "inactive" 239 } 240 241 for service in svc_target_state: 242 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 243 out = "\n".join(["[%s]" % service, out]) 244 svc_config.read_string(out) 245 246 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 247 continue 248 249 service_state = svc_config[service]["ActiveState"] 250 self.log.info("Current state of %s service is %s" % (service, service_state)) 251 self.svc_restore_dict.update({service: service_state}) 252 if service_state != "inactive": 253 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 254 self.exec_cmd(["sudo", "systemctl", "stop", service]) 255 256 def configure_sysctl(self): 257 self.log.info("Tuning sysctl settings...") 258 259 sysctl_opts = { 260 "net.core.busy_poll": 0, 261 "net.core.busy_read": 0, 262 "net.core.somaxconn": 4096, 263 "net.core.netdev_max_backlog": 8192, 264 "net.ipv4.tcp_max_syn_backlog": 16384, 265 "net.core.rmem_max": 268435456, 266 "net.core.wmem_max": 268435456, 267 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 268 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 269 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 270 "net.ipv4.route.flush": 1, 271 "vm.overcommit_memory": 1, 272 } 273 274 if self.enable_adq: 275 sysctl_opts.update(self.adq_set_busy_read(1)) 276 277 for opt, value in sysctl_opts.items(): 278 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 279 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 280 281 def configure_tuned(self): 282 if not self.tuned_profile: 283 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 284 return 285 286 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 287 service = "tuned" 288 tuned_config = configparser.ConfigParser(strict=False) 289 290 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 291 out = "\n".join(["[%s]" % service, out]) 292 tuned_config.read_string(out) 293 tuned_state = tuned_config[service]["ActiveState"] 294 self.svc_restore_dict.update({service: tuned_state}) 295 296 if tuned_state != "inactive": 297 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 298 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 299 300 self.tuned_restore_dict = { 301 "profile": profile, 302 "mode": profile_mode 303 } 304 305 self.exec_cmd(["sudo", "systemctl", "start", service]) 306 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 307 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 308 309 def configure_cpu_governor(self): 310 self.log.info("Setting CPU governor to performance...") 311 312 # This assumes that there is the same CPU scaling governor on each CPU 313 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 314 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 315 316 def configure_irq_affinity(self): 317 self.log.info("Setting NIC irq affinity for NICs...") 318 319 irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh") 320 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 321 for nic in nic_names: 322 irq_cmd = ["sudo", irq_script_path, nic] 323 self.log.info(irq_cmd) 324 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 325 326 def restore_services(self): 327 self.log.info("Restoring services...") 328 for service, state in self.svc_restore_dict.items(): 329 cmd = "stop" if state == "inactive" else "start" 330 self.exec_cmd(["sudo", "systemctl", cmd, service]) 331 332 def restore_sysctl(self): 333 self.log.info("Restoring sysctl settings...") 334 for opt, value in self.sysctl_restore_dict.items(): 335 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 336 337 def restore_tuned(self): 338 self.log.info("Restoring tuned-adm settings...") 339 340 if not self.tuned_restore_dict: 341 return 342 343 if self.tuned_restore_dict["mode"] == "auto": 344 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 345 self.log.info("Reverted tuned-adm to auto_profile.") 346 else: 347 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 348 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 349 350 def restore_governor(self): 351 self.log.info("Restoring CPU governor setting...") 352 if self.governor_restore: 353 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 354 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 355 356 def restore_settings(self): 357 self.restore_governor() 358 self.restore_tuned() 359 self.restore_services() 360 self.restore_sysctl() 361 if self.enable_adq: 362 self.reload_driver("ice") 363 364 365class Target(Server): 366 def __init__(self, name, general_config, target_config): 367 super().__init__(name, general_config, target_config) 368 369 # Defaults 370 self.enable_zcopy = False 371 self.scheduler_name = "static" 372 self.null_block = 0 373 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 374 self.subsystem_info_list = [] 375 self.initiator_info = [] 376 self.nvme_allowlist = [] 377 self.nvme_blocklist = [] 378 379 # Target-side measurement options 380 self.enable_pm = True 381 self.enable_sar = True 382 self.enable_pcm = True 383 self.enable_bw = True 384 self.enable_dpdk_memory = True 385 386 if "null_block_devices" in target_config: 387 self.null_block = target_config["null_block_devices"] 388 if "scheduler_settings" in target_config: 389 self.scheduler_name = target_config["scheduler_settings"] 390 if "zcopy_settings" in target_config: 391 self.enable_zcopy = target_config["zcopy_settings"] 392 if "results_dir" in target_config: 393 self.results_dir = target_config["results_dir"] 394 if "blocklist" in target_config: 395 self.nvme_blocklist = target_config["blocklist"] 396 if "allowlist" in target_config: 397 self.nvme_allowlist = target_config["allowlist"] 398 # Blocklist takes precedence, remove common elements from allowlist 399 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 400 if "enable_pm" in target_config: 401 self.enable_pm = target_config["enable_pm"] 402 if "enable_sar" in target_config: 403 self.enable_sar = target_config["enable_sar"] 404 if "enable_pcm" in target_config: 405 self.enable_pcm = target_config["enable_pcm"] 406 if "enable_bandwidth" in target_config: 407 self.enable_bw = target_config["enable_bandwidth"] 408 if "enable_dpdk_memory" in target_config: 409 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 410 411 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 412 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 413 414 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 415 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 416 self.set_local_nic_info(self.set_local_nic_info_helper()) 417 418 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 419 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 420 421 self.configure_system() 422 if self.enable_adq: 423 self.configure_adq() 424 self.sys_config() 425 426 def set_local_nic_info_helper(self): 427 return json.loads(self.exec_cmd(["lshw", "-json"])) 428 429 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 430 stderr_opt = None 431 if stderr_redirect: 432 stderr_opt = subprocess.STDOUT 433 if change_dir: 434 old_cwd = os.getcwd() 435 os.chdir(change_dir) 436 self.log.info("Changing directory to %s" % change_dir) 437 438 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 439 440 if change_dir: 441 os.chdir(old_cwd) 442 self.log.info("Changing directory to %s" % old_cwd) 443 return out 444 445 def zip_spdk_sources(self, spdk_dir, dest_file): 446 self.log.info("Zipping SPDK source directory") 447 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 448 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 449 for file in files: 450 fh.write(os.path.relpath(os.path.join(root, file))) 451 fh.close() 452 self.log.info("Done zipping") 453 454 @staticmethod 455 def _chunks(input_list, chunks_no): 456 div, rem = divmod(len(input_list), chunks_no) 457 for i in range(chunks_no): 458 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 459 yield input_list[si:si + (div + 1 if i < rem else div)] 460 461 def spread_bdevs(self, req_disks): 462 # Spread available block devices indexes: 463 # - evenly across available initiator systems 464 # - evenly across available NIC interfaces for 465 # each initiator 466 # Not NUMA aware. 467 ip_bdev_map = [] 468 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 469 470 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 471 self.initiator_info[i]["bdev_range"] = init_chunk 472 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 473 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 474 for c in nic_chunk: 475 ip_bdev_map.append((ip, c)) 476 return ip_bdev_map 477 478 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 479 cpu_number = os.cpu_count() 480 sar_idle_sum = 0 481 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 482 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 483 484 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 485 time.sleep(ramp_time) 486 self.log.info("Starting SAR measurements") 487 488 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 489 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 490 for line in out.split("\n"): 491 if "Average" in line: 492 if "CPU" in line: 493 self.log.info("Summary CPU utilization from SAR:") 494 self.log.info(line) 495 elif "all" in line: 496 self.log.info(line) 497 else: 498 sar_idle_sum += float(line.split()[7]) 499 fh.write(out) 500 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 501 502 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 503 f.write("%0.2f" % sar_cpu_usage) 504 505 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 506 time.sleep(ramp_time) 507 self.log.info("Starting power measurements") 508 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 509 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 510 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 511 512 def ethtool_after_fio_ramp(self, fio_ramp_time): 513 time.sleep(fio_ramp_time//2) 514 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 515 for nic in nic_names: 516 self.log.info(nic) 517 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, 518 "channel-pkt-inspect-optimize", "off"]) # Disable channel packet inspection optimization 519 520 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 521 time.sleep(ramp_time) 522 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 523 pcm_memory = subprocess.Popen(cmd) 524 time.sleep(run_time) 525 pcm_memory.terminate() 526 527 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 528 time.sleep(ramp_time) 529 cmd = ["pcm", "1", "-i=%s" % run_time, 530 "-csv=%s/%s" % (results_dir, pcm_file_name)] 531 subprocess.run(cmd) 532 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 533 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 534 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 535 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 536 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 537 538 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 539 time.sleep(ramp_time) 540 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 541 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 542 fh.write(out) 543 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 544 # Improve this so that additional file containing measurements average is generated too. 545 546 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 547 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 548 time.sleep(ramp_time) 549 self.log.info("INFO: starting network bandwidth measure") 550 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 551 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 552 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 553 # Improve this so that additional file containing measurements average is generated too. 554 # TODO: Monitor only these interfaces which are currently used to run the workload. 555 556 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 557 self.log.info("INFO: waiting to generate DPDK memory usage") 558 time.sleep(ramp_time) 559 self.log.info("INFO: generating DPDK memory usage") 560 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 561 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 562 563 def sys_config(self): 564 self.log.info("====Kernel release:====") 565 self.log.info(os.uname().release) 566 self.log.info("====Kernel command line:====") 567 with open('/proc/cmdline') as f: 568 cmdline = f.readlines() 569 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 570 self.log.info("====sysctl conf:====") 571 with open('/etc/sysctl.conf') as f: 572 sysctl = f.readlines() 573 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 574 self.log.info("====Cpu power info:====") 575 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 576 self.log.info("====zcopy settings:====") 577 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 578 self.log.info("====Scheduler settings:====") 579 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 580 581 582class Initiator(Server): 583 def __init__(self, name, general_config, initiator_config): 584 super().__init__(name, general_config, initiator_config) 585 586 # Required fields 587 self.ip = initiator_config["ip"] 588 self.target_nic_ips = initiator_config["target_nic_ips"] 589 590 # Defaults 591 self.cpus_allowed = None 592 self.cpus_allowed_policy = "shared" 593 self.spdk_dir = "/tmp/spdk" 594 self.fio_bin = "/usr/src/fio/fio" 595 self.nvmecli_bin = "nvme" 596 self.cpu_frequency = None 597 self.subsystem_info_list = [] 598 599 if "spdk_dir" in initiator_config: 600 self.spdk_dir = initiator_config["spdk_dir"] 601 if "fio_bin" in initiator_config: 602 self.fio_bin = initiator_config["fio_bin"] 603 if "nvmecli_bin" in initiator_config: 604 self.nvmecli_bin = initiator_config["nvmecli_bin"] 605 if "cpus_allowed" in initiator_config: 606 self.cpus_allowed = initiator_config["cpus_allowed"] 607 if "cpus_allowed_policy" in initiator_config: 608 self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"] 609 if "cpu_frequency" in initiator_config: 610 self.cpu_frequency = initiator_config["cpu_frequency"] 611 if os.getenv('SPDK_WORKSPACE'): 612 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 613 614 self.ssh_connection = paramiko.SSHClient() 615 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 616 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 617 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 618 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 619 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 620 621 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 622 self.copy_spdk("/tmp/spdk.zip") 623 self.set_local_nic_info(self.set_local_nic_info_helper()) 624 self.set_cpu_frequency() 625 self.configure_system() 626 if self.enable_adq: 627 self.configure_adq() 628 self.sys_config() 629 630 def set_local_nic_info_helper(self): 631 return json.loads(self.exec_cmd(["lshw", "-json"])) 632 633 def stop(self): 634 self.restore_settings() 635 self.ssh_connection.close() 636 637 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 638 if change_dir: 639 cmd = ["cd", change_dir, ";", *cmd] 640 641 # In case one of the command elements contains whitespace and is not 642 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 643 # when sending to remote system. 644 for i, c in enumerate(cmd): 645 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 646 cmd[i] = '"%s"' % c 647 cmd = " ".join(cmd) 648 649 # Redirect stderr to stdout thanks using get_pty option if needed 650 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 651 out = stdout.read().decode(encoding="utf-8") 652 653 # Check the return code 654 rc = stdout.channel.recv_exit_status() 655 if rc: 656 raise CalledProcessError(int(rc), cmd, out) 657 658 return out 659 660 def put_file(self, local, remote_dest): 661 ftp = self.ssh_connection.open_sftp() 662 ftp.put(local, remote_dest) 663 ftp.close() 664 665 def get_file(self, remote, local_dest): 666 ftp = self.ssh_connection.open_sftp() 667 ftp.get(remote, local_dest) 668 ftp.close() 669 670 def copy_spdk(self, local_spdk_zip): 671 self.log.info("Copying SPDK sources to initiator %s" % self.name) 672 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 673 self.log.info("Copied sources zip from target") 674 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 675 self.log.info("Sources unpacked") 676 677 def copy_result_files(self, dest_dir): 678 self.log.info("Copying results") 679 680 if not os.path.exists(dest_dir): 681 os.mkdir(dest_dir) 682 683 # Get list of result files from initiator and copy them back to target 684 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 685 686 for file in file_list: 687 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 688 os.path.join(dest_dir, file)) 689 self.log.info("Done copying results") 690 691 def match_subsystems(self, target_subsytems): 692 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 693 subsystems.sort(key=lambda x: x[1]) 694 self.log.info("Found matching subsystems on target side:") 695 for s in subsystems: 696 self.log.info(s) 697 self.subsystem_info_list = subsystems 698 699 def gen_fio_filename_conf(self, *args, **kwargs): 700 # Logic implemented in SPDKInitiator and KernelInitiator classes 701 pass 702 703 def get_route_nic_numa(self, remote_nvme_ip): 704 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 705 local_nvme_nic = local_nvme_nic[0]["dev"] 706 return self.get_nic_numa_node(local_nvme_nic) 707 708 @staticmethod 709 def gen_fio_offset_section(offset_inc, num_jobs): 710 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 711 return "\n".join(["size=%s%%" % offset_inc, 712 "offset=0%", 713 "offset_increment=%s%%" % offset_inc]) 714 715 def gen_fio_numa_section(self, fio_filenames_list): 716 numa_stats = {} 717 for nvme in fio_filenames_list: 718 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 719 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 720 721 # Use the most common NUMA node for this chunk to allocate memory and CPUs 722 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 723 724 return "\n".join(["numa_cpu_nodes=%s" % section_local_numa, 725 "numa_mem_policy=prefer:%s" % section_local_numa]) 726 727 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 728 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 729 offset=False, offset_inc=0): 730 fio_conf_template = """ 731[global] 732ioengine={ioengine} 733{spdk_conf} 734thread=1 735group_reporting=1 736direct=1 737percentile_list=50:90:99:99.5:99.9:99.99:99.999 738 739norandommap=1 740rw={rw} 741rwmixread={rwmixread} 742bs={block_size} 743time_based=1 744ramp_time={ramp_time} 745runtime={run_time} 746rate_iops={rate_iops} 747""" 748 749 if self.cpus_allowed is not None: 750 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 751 cpus_num = 0 752 cpus = self.cpus_allowed.split(",") 753 for cpu in cpus: 754 if "-" in cpu: 755 a, b = cpu.split("-") 756 a = int(a) 757 b = int(b) 758 cpus_num += len(range(a, b)) 759 else: 760 cpus_num += 1 761 self.num_cores = cpus_num 762 threads = range(0, self.num_cores) 763 elif hasattr(self, 'num_cores'): 764 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 765 threads = range(0, int(self.num_cores)) 766 else: 767 self.num_cores = len(self.subsystem_info_list) 768 threads = range(0, len(self.subsystem_info_list)) 769 770 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 771 offset, offset_inc) 772 773 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 774 rw=rw, rwmixread=rwmixread, block_size=block_size, 775 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 776 777 # TODO: hipri disabled for now, as it causes fio errors: 778 # io_u error on file /dev/nvme2n1: Operation not supported 779 # See comment in KernelInitiator class, init_connect() function 780 if "io_uring" in self.ioengine: 781 fio_config = fio_config + """ 782fixedbufs=1 783registerfiles=1 784#hipri=1 785""" 786 if num_jobs: 787 fio_config = fio_config + "numjobs=%s \n" % num_jobs 788 if self.cpus_allowed is not None: 789 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 790 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 791 fio_config = fio_config + filename_section 792 793 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 794 if hasattr(self, "num_cores"): 795 fio_config_filename += "_%sCPU" % self.num_cores 796 fio_config_filename += ".fio" 797 798 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 799 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 800 self.log.info("Created FIO Config:") 801 self.log.info(fio_config) 802 803 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 804 805 def set_cpu_frequency(self): 806 if self.cpu_frequency is not None: 807 try: 808 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 809 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 810 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 811 except Exception: 812 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 813 sys.exit() 814 else: 815 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 816 817 def run_fio(self, fio_config_file, run_num=1): 818 job_name, _ = os.path.splitext(fio_config_file) 819 self.log.info("Starting FIO run for job: %s" % job_name) 820 self.log.info("Using FIO: %s" % self.fio_bin) 821 822 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 823 try: 824 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 825 "--output=%s" % output_filename, "--eta=never"], True) 826 self.log.info(output) 827 self.log.info("FIO run finished. Results in: %s" % output_filename) 828 except subprocess.CalledProcessError as e: 829 self.log.error("ERROR: Fio process failed!") 830 self.log.error(e.stdout) 831 832 def sys_config(self): 833 self.log.info("====Kernel release:====") 834 self.log.info(self.exec_cmd(["uname", "-r"])) 835 self.log.info("====Kernel command line:====") 836 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 837 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 838 self.log.info("====sysctl conf:====") 839 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 840 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 841 self.log.info("====Cpu power info:====") 842 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 843 844 845class KernelTarget(Target): 846 def __init__(self, name, general_config, target_config): 847 super().__init__(name, general_config, target_config) 848 # Defaults 849 self.nvmet_bin = "nvmetcli" 850 851 if "nvmet_bin" in target_config: 852 self.nvmet_bin = target_config["nvmet_bin"] 853 854 def load_drivers(self): 855 self.log.info("Loading drivers") 856 super().load_drivers() 857 if self.null_block: 858 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 859 860 def configure_adq(self): 861 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 862 863 def adq_configure_tc(self): 864 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 865 866 def adq_set_busy_read(self, busy_read_val): 867 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 868 return {"net.core.busy_read": 0} 869 870 def stop(self): 871 self.nvmet_command(self.nvmet_bin, "clear") 872 self.restore_settings() 873 874 def get_nvme_device_bdf(self, nvme_dev_path): 875 nvme_name = os.path.basename(nvme_dev_path) 876 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 877 878 def get_nvme_devices(self): 879 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 880 nvme_list = [] 881 for dev in dev_list: 882 if "nvme" not in dev: 883 continue 884 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 885 continue 886 if len(self.nvme_allowlist) == 0: 887 nvme_list.append(dev) 888 continue 889 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 890 nvme_list.append(dev) 891 return dev_list 892 893 def nvmet_command(self, nvmet_bin, command): 894 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 895 896 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 897 898 nvmet_cfg = { 899 "ports": [], 900 "hosts": [], 901 "subsystems": [], 902 } 903 904 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 905 port = str(4420 + bdev_num) 906 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 907 serial = "SPDK00%s" % bdev_num 908 bdev_name = nvme_list[bdev_num] 909 910 nvmet_cfg["subsystems"].append({ 911 "allowed_hosts": [], 912 "attr": { 913 "allow_any_host": "1", 914 "serial": serial, 915 "version": "1.3" 916 }, 917 "namespaces": [ 918 { 919 "device": { 920 "path": bdev_name, 921 "uuid": "%s" % uuid.uuid4() 922 }, 923 "enable": 1, 924 "nsid": port 925 } 926 ], 927 "nqn": nqn 928 }) 929 930 nvmet_cfg["ports"].append({ 931 "addr": { 932 "adrfam": "ipv4", 933 "traddr": ip, 934 "trsvcid": port, 935 "trtype": self.transport 936 }, 937 "portid": bdev_num, 938 "referrals": [], 939 "subsystems": [nqn] 940 }) 941 942 self.subsystem_info_list.append((port, nqn, ip)) 943 self.subsys_no = len(self.subsystem_info_list) 944 945 with open("kernel.conf", "w") as fh: 946 fh.write(json.dumps(nvmet_cfg, indent=2)) 947 948 def tgt_start(self): 949 self.log.info("Configuring kernel NVMeOF Target") 950 951 if self.null_block: 952 self.log.info("Configuring with null block device.") 953 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 954 else: 955 self.log.info("Configuring with NVMe drives.") 956 nvme_list = self.get_nvme_devices() 957 958 self.kernel_tgt_gen_subsystem_conf(nvme_list) 959 self.subsys_no = len(nvme_list) 960 961 self.nvmet_command(self.nvmet_bin, "clear") 962 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 963 964 if self.enable_adq: 965 self.adq_configure_tc() 966 967 self.log.info("Done configuring kernel NVMeOF Target") 968 969 970class SPDKTarget(Target): 971 def __init__(self, name, general_config, target_config): 972 super().__init__(name, general_config, target_config) 973 974 # Required fields 975 self.core_mask = target_config["core_mask"] 976 self.num_cores = self.get_num_cores(self.core_mask) 977 978 # Defaults 979 self.dif_insert_strip = False 980 self.null_block_dif_type = 0 981 self.num_shared_buffers = 4096 982 self.max_queue_depth = 128 983 self.bpf_proc = None 984 self.bpf_scripts = [] 985 self.enable_dsa = False 986 self.scheduler_core_limit = None 987 988 if "num_shared_buffers" in target_config: 989 self.num_shared_buffers = target_config["num_shared_buffers"] 990 if "max_queue_depth" in target_config: 991 self.max_queue_depth = target_config["max_queue_depth"] 992 if "null_block_dif_type" in target_config: 993 self.null_block_dif_type = target_config["null_block_dif_type"] 994 if "dif_insert_strip" in target_config: 995 self.dif_insert_strip = target_config["dif_insert_strip"] 996 if "bpf_scripts" in target_config: 997 self.bpf_scripts = target_config["bpf_scripts"] 998 if "dsa_settings" in target_config: 999 self.enable_dsa = target_config["dsa_settings"] 1000 if "scheduler_core_limit" in target_config: 1001 self.scheduler_core_limit = target_config["scheduler_core_limit"] 1002 1003 self.log.info("====DSA settings:====") 1004 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1005 1006 def adq_set_busy_read(self, busy_read_val): 1007 return {"net.core.busy_read": busy_read_val} 1008 1009 def get_nvme_devices_count(self): 1010 return len(self.get_nvme_devices()) 1011 1012 def get_nvme_devices(self): 1013 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1014 bdev_bdfs = [] 1015 for bdev in bdev_subsys_json_obj["config"]: 1016 bdev_traddr = bdev["params"]["traddr"] 1017 if bdev_traddr in self.nvme_blocklist: 1018 continue 1019 if len(self.nvme_allowlist) == 0: 1020 bdev_bdfs.append(bdev_traddr) 1021 if bdev_traddr in self.nvme_allowlist: 1022 bdev_bdfs.append(bdev_traddr) 1023 return bdev_bdfs 1024 1025 @staticmethod 1026 def get_num_cores(core_mask): 1027 if "0x" in core_mask: 1028 return bin(int(core_mask, 16)).count("1") 1029 else: 1030 num_cores = 0 1031 core_mask = core_mask.replace("[", "") 1032 core_mask = core_mask.replace("]", "") 1033 for i in core_mask.split(","): 1034 if "-" in i: 1035 x, y = i.split("-") 1036 num_cores += len(range(int(x), int(y))) + 1 1037 else: 1038 num_cores += 1 1039 return num_cores 1040 1041 def spdk_tgt_configure(self): 1042 self.log.info("Configuring SPDK NVMeOF target via RPC") 1043 1044 if self.enable_adq: 1045 self.adq_configure_tc() 1046 1047 # Create transport layer 1048 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, 1049 num_shared_buffers=self.num_shared_buffers, 1050 max_queue_depth=self.max_queue_depth, 1051 dif_insert_or_strip=self.dif_insert_strip, 1052 sock_priority=self.adq_priority) 1053 self.log.info("SPDK NVMeOF transport layer:") 1054 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1055 1056 if self.null_block: 1057 self.spdk_tgt_add_nullblock(self.null_block) 1058 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1059 else: 1060 self.spdk_tgt_add_nvme_conf() 1061 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1062 1063 self.log.info("Done configuring SPDK NVMeOF Target") 1064 1065 def spdk_tgt_add_nullblock(self, null_block_count): 1066 md_size = 0 1067 block_size = 4096 1068 if self.null_block_dif_type != 0: 1069 md_size = 128 1070 1071 self.log.info("Adding null block bdevices to config via RPC") 1072 for i in range(null_block_count): 1073 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1074 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1075 dif_type=self.null_block_dif_type, md_size=md_size) 1076 self.log.info("SPDK Bdevs configuration:") 1077 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1078 1079 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1080 self.log.info("Adding NVMe bdevs to config via RPC") 1081 1082 bdfs = self.get_nvme_devices() 1083 bdfs = [b.replace(":", ".") for b in bdfs] 1084 1085 if req_num_disks: 1086 if req_num_disks > len(bdfs): 1087 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1088 sys.exit(1) 1089 else: 1090 bdfs = bdfs[0:req_num_disks] 1091 1092 for i, bdf in enumerate(bdfs): 1093 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1094 1095 self.log.info("SPDK Bdevs configuration:") 1096 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1097 1098 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1099 self.log.info("Adding subsystems to config") 1100 if not req_num_disks: 1101 req_num_disks = self.get_nvme_devices_count() 1102 1103 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1104 port = str(4420 + bdev_num) 1105 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1106 serial = "SPDK00%s" % bdev_num 1107 bdev_name = "Nvme%sn1" % bdev_num 1108 1109 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1110 allow_any_host=True, max_namespaces=8) 1111 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1112 for nqn_name in [nqn, "discovery"]: 1113 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1114 nqn=nqn_name, 1115 trtype=self.transport, 1116 traddr=ip, 1117 trsvcid=port, 1118 adrfam="ipv4") 1119 self.subsystem_info_list.append((port, nqn, ip)) 1120 self.subsys_no = len(self.subsystem_info_list) 1121 1122 self.log.info("SPDK NVMeOF subsystem configuration:") 1123 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1124 1125 def bpf_start(self): 1126 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1127 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1128 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1129 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1130 1131 with open(self.pid, "r") as fh: 1132 nvmf_pid = str(fh.readline()) 1133 1134 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1135 self.log.info(cmd) 1136 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1137 1138 def tgt_start(self): 1139 if self.null_block: 1140 self.subsys_no = 1 1141 else: 1142 self.subsys_no = self.get_nvme_devices_count() 1143 self.log.info("Starting SPDK NVMeOF Target process") 1144 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1145 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1146 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1147 1148 with open(self.pid, "w") as fh: 1149 fh.write(str(proc.pid)) 1150 self.nvmf_proc = proc 1151 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1152 self.log.info("Waiting for spdk to initialize...") 1153 while True: 1154 if os.path.exists("/var/tmp/spdk.sock"): 1155 break 1156 time.sleep(1) 1157 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1158 1159 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1160 1161 if self.enable_zcopy: 1162 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1163 enable_zerocopy_send_server=True) 1164 self.log.info("Target socket options:") 1165 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1166 1167 if self.enable_adq: 1168 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1169 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1170 nvme_adminq_poll_period_us=100000, retry_count=4) 1171 1172 if self.enable_dsa: 1173 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1174 self.log.info("Target DSA accel module enabled") 1175 1176 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1177 rpc.framework_start_init(self.client) 1178 1179 if self.bpf_scripts: 1180 self.bpf_start() 1181 1182 self.spdk_tgt_configure() 1183 1184 def stop(self): 1185 if self.bpf_proc: 1186 self.log.info("Stopping BPF Trace script") 1187 self.bpf_proc.terminate() 1188 self.bpf_proc.wait() 1189 1190 if hasattr(self, "nvmf_proc"): 1191 try: 1192 self.nvmf_proc.terminate() 1193 self.nvmf_proc.wait(timeout=30) 1194 except Exception as e: 1195 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1196 self.log.info(e) 1197 self.nvmf_proc.kill() 1198 self.nvmf_proc.communicate() 1199 # Try to clean up RPC socket files if they were not removed 1200 # because of using 'kill' 1201 try: 1202 os.remove("/var/tmp/spdk.sock") 1203 os.remove("/var/tmp/spdk.sock.lock") 1204 except FileNotFoundError: 1205 pass 1206 self.restore_settings() 1207 1208 1209class KernelInitiator(Initiator): 1210 def __init__(self, name, general_config, initiator_config): 1211 super().__init__(name, general_config, initiator_config) 1212 1213 # Defaults 1214 self.extra_params = "" 1215 self.ioengine = "libaio" 1216 self.spdk_conf = "" 1217 1218 if "num_cores" in initiator_config: 1219 self.num_cores = initiator_config["num_cores"] 1220 1221 if "extra_params" in initiator_config: 1222 self.extra_params = initiator_config["extra_params"] 1223 1224 if "kernel_engine" in initiator_config: 1225 self.ioengine = initiator_config["kernel_engine"] 1226 if "io_uring" in self.ioengine: 1227 self.extra_params = "--nr-poll-queues=8" 1228 1229 def configure_adq(self): 1230 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1231 1232 def adq_configure_tc(self): 1233 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1234 1235 def adq_set_busy_read(self, busy_read_val): 1236 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1237 return {"net.core.busy_read": 0} 1238 1239 def get_connected_nvme_list(self): 1240 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1241 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1242 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1243 return nvme_list 1244 1245 def init_connect(self): 1246 self.log.info("Below connection attempts may result in error messages, this is expected!") 1247 for subsystem in self.subsystem_info_list: 1248 self.log.info("Trying to connect %s %s %s" % subsystem) 1249 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1250 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1251 time.sleep(2) 1252 1253 if "io_uring" in self.ioengine: 1254 self.log.info("Setting block layer settings for io_uring.") 1255 1256 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1257 # apparently it's not possible for connected subsystems. 1258 # Results in "error: Invalid argument" 1259 block_sysfs_settings = { 1260 "iostats": "0", 1261 "rq_affinity": "0", 1262 "nomerges": "2" 1263 } 1264 1265 for disk in self.get_connected_nvme_list(): 1266 sysfs = os.path.join("/sys/block", disk, "queue") 1267 for k, v in block_sysfs_settings.items(): 1268 sysfs_opt_path = os.path.join(sysfs, k) 1269 try: 1270 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1271 except CalledProcessError as e: 1272 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1273 finally: 1274 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1275 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1276 1277 def init_disconnect(self): 1278 for subsystem in self.subsystem_info_list: 1279 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1280 time.sleep(1) 1281 1282 def get_nvme_subsystem_numa(self, dev_name): 1283 # Remove two last characters to get controller name instead of subsystem name 1284 nvme_ctrl = os.path.basename(dev_name)[:-2] 1285 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1286 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1287 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1288 1289 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1290 if len(threads) >= len(subsystems): 1291 threads = range(0, len(subsystems)) 1292 1293 # Generate connected nvme devices names and sort them by used NIC numa node 1294 # to allow better grouping when splitting into fio sections. 1295 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1296 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1297 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1298 1299 filename_section = "" 1300 nvme_per_split = int(len(nvme_list) / len(threads)) 1301 remainder = len(nvme_list) % len(threads) 1302 iterator = iter(nvme_list) 1303 result = [] 1304 for i in range(len(threads)): 1305 result.append([]) 1306 for _ in range(nvme_per_split): 1307 result[i].append(next(iterator)) 1308 if remainder: 1309 result[i].append(next(iterator)) 1310 remainder -= 1 1311 for i, r in enumerate(result): 1312 header = "[filename%s]" % i 1313 disks = "\n".join(["filename=%s" % x for x in r]) 1314 job_section_qd = round((io_depth * len(r)) / num_jobs) 1315 if job_section_qd == 0: 1316 job_section_qd = 1 1317 iodepth = "iodepth=%s" % job_section_qd 1318 1319 offset_section = "" 1320 if offset: 1321 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1322 1323 numa_opts = self.gen_fio_numa_section(r) 1324 1325 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1326 1327 return filename_section 1328 1329 1330class SPDKInitiator(Initiator): 1331 def __init__(self, name, general_config, initiator_config): 1332 super().__init__(name, general_config, initiator_config) 1333 1334 if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False: 1335 self.install_spdk() 1336 1337 # Optional fields 1338 self.enable_data_digest = False 1339 if "enable_data_digest" in initiator_config: 1340 self.enable_data_digest = initiator_config["enable_data_digest"] 1341 if "num_cores" in initiator_config: 1342 self.num_cores = initiator_config["num_cores"] 1343 1344 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1345 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1346 1347 def adq_set_busy_read(self, busy_read_val): 1348 return {"net.core.busy_read": busy_read_val} 1349 1350 def install_spdk(self): 1351 self.log.info("Using fio binary %s" % self.fio_bin) 1352 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1353 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1354 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)]) 1355 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1356 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1357 1358 self.log.info("SPDK built") 1359 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1360 1361 def init_connect(self): 1362 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1363 # bdev plugin initiates connection just before starting IO traffic. 1364 # This is just to have a "init_connect" equivalent of the same function 1365 # from KernelInitiator class. 1366 # Just prepare bdev.conf JSON file for later use and consider it 1367 # "making a connection". 1368 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1369 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1370 1371 def init_disconnect(self): 1372 # SPDK Initiator does not need to explicity disconnect as this gets done 1373 # after fio bdev plugin finishes IO. 1374 pass 1375 1376 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1377 bdev_cfg_section = { 1378 "subsystems": [ 1379 { 1380 "subsystem": "bdev", 1381 "config": [] 1382 } 1383 ] 1384 } 1385 1386 for i, subsys in enumerate(remote_subsystem_list): 1387 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1388 nvme_ctrl = { 1389 "method": "bdev_nvme_attach_controller", 1390 "params": { 1391 "name": "Nvme{}".format(i), 1392 "trtype": self.transport, 1393 "traddr": sub_addr, 1394 "trsvcid": sub_port, 1395 "subnqn": sub_nqn, 1396 "adrfam": "IPv4" 1397 } 1398 } 1399 1400 if self.enable_adq: 1401 nvme_ctrl["params"].update({"priority": "1"}) 1402 1403 if self.enable_data_digest: 1404 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1405 1406 bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1407 1408 return json.dumps(bdev_cfg_section, indent=2) 1409 1410 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1411 filename_section = "" 1412 if len(threads) >= len(subsystems): 1413 threads = range(0, len(subsystems)) 1414 1415 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1416 # to allow better grouping when splitting into fio sections. 1417 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1418 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1419 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1420 1421 nvme_per_split = int(len(subsystems) / len(threads)) 1422 remainder = len(subsystems) % len(threads) 1423 iterator = iter(filenames) 1424 result = [] 1425 for i in range(len(threads)): 1426 result.append([]) 1427 for _ in range(nvme_per_split): 1428 result[i].append(next(iterator)) 1429 if remainder: 1430 result[i].append(next(iterator)) 1431 remainder -= 1 1432 for i, r in enumerate(result): 1433 header = "[filename%s]" % i 1434 disks = "\n".join(["filename=%s" % x for x in r]) 1435 job_section_qd = round((io_depth * len(r)) / num_jobs) 1436 if job_section_qd == 0: 1437 job_section_qd = 1 1438 iodepth = "iodepth=%s" % job_section_qd 1439 1440 offset_section = "" 1441 if offset: 1442 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1443 1444 numa_opts = self.gen_fio_numa_section(r) 1445 1446 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1447 1448 return filename_section 1449 1450 def get_nvme_subsystem_numa(self, bdev_name): 1451 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1452 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1453 1454 # Remove two last characters to get controller name instead of subsystem name 1455 nvme_ctrl = bdev_name[:-2] 1456 remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"] 1457 return self.get_route_nic_numa(remote_nvme_ip) 1458 1459 1460if __name__ == "__main__": 1461 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1462 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1463 1464 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1465 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1466 help='Configuration file.') 1467 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1468 help='Results directory.') 1469 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1470 help='CSV results filename.') 1471 parser.add_argument('-f', '--force', default=False, action='store_true', 1472 dest='force', help="""Force script to continue and try to use all 1473 available NVMe devices during test. 1474 WARNING: Might result in data loss on used NVMe drives""") 1475 1476 args = parser.parse_args() 1477 1478 logging.basicConfig(level=logging.INFO, 1479 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1480 1481 logging.info("Using config file: %s" % args.config) 1482 with open(args.config, "r") as config: 1483 data = json.load(config) 1484 1485 initiators = [] 1486 fio_cases = [] 1487 1488 general_config = data["general"] 1489 target_config = data["target"] 1490 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1491 1492 if "null_block_devices" not in data["target"] and \ 1493 (args.force is False and 1494 "allowlist" not in data["target"] and 1495 "blocklist" not in data["target"]): 1496 # TODO: Also check if allowlist or blocklist are not empty. 1497 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1498 You can choose to use all available NVMe drives on your system, which may potentially 1499 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1500 exit(1) 1501 1502 for k, v in data.items(): 1503 if "target" in k: 1504 v.update({"results_dir": args.results}) 1505 if data[k]["mode"] == "spdk": 1506 target_obj = SPDKTarget(k, data["general"], v) 1507 elif data[k]["mode"] == "kernel": 1508 target_obj = KernelTarget(k, data["general"], v) 1509 elif "initiator" in k: 1510 if data[k]["mode"] == "spdk": 1511 init_obj = SPDKInitiator(k, data["general"], v) 1512 elif data[k]["mode"] == "kernel": 1513 init_obj = KernelInitiator(k, data["general"], v) 1514 initiators.append(init_obj) 1515 elif "fio" in k: 1516 fio_workloads = itertools.product(data[k]["bs"], 1517 data[k]["qd"], 1518 data[k]["rw"]) 1519 1520 fio_run_time = data[k]["run_time"] 1521 fio_ramp_time = data[k]["ramp_time"] 1522 fio_rw_mix_read = data[k]["rwmixread"] 1523 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1524 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1525 1526 fio_rate_iops = 0 1527 if "rate_iops" in data[k]: 1528 fio_rate_iops = data[k]["rate_iops"] 1529 1530 fio_offset = False 1531 if "offset" in data[k]: 1532 fio_offset = data[k]["offset"] 1533 fio_offset_inc = 0 1534 if "offset_inc" in data[k]: 1535 fio_offset_inc = data[k]["offset_inc"] 1536 else: 1537 continue 1538 1539 try: 1540 os.mkdir(args.results) 1541 except FileExistsError: 1542 pass 1543 1544 for i in initiators: 1545 target_obj.initiator_info.append( 1546 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1547 ) 1548 1549 # TODO: This try block is definietly too large. Need to break this up into separate 1550 # logical blocks to reduce size. 1551 try: 1552 target_obj.tgt_start() 1553 1554 for i in initiators: 1555 i.match_subsystems(target_obj.subsystem_info_list) 1556 if i.enable_adq: 1557 i.adq_configure_tc() 1558 1559 # Poor mans threading 1560 # Run FIO tests 1561 for block_size, io_depth, rw in fio_workloads: 1562 configs = [] 1563 for i in initiators: 1564 i.init_connect() 1565 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1566 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1567 fio_offset, fio_offset_inc) 1568 configs.append(cfg) 1569 1570 for run_no in range(1, fio_run_num+1): 1571 threads = [] 1572 power_daemon = None 1573 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1574 1575 for i, cfg in zip(initiators, configs): 1576 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1577 threads.append(t) 1578 if target_obj.enable_sar: 1579 sar_file_prefix = measurements_prefix + "_sar" 1580 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1581 threads.append(t) 1582 1583 if target_obj.enable_pcm: 1584 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1585 1586 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1587 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1588 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1589 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1590 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1591 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1592 1593 threads.append(pcm_cpu_t) 1594 threads.append(pcm_mem_t) 1595 threads.append(pcm_pow_t) 1596 1597 if target_obj.enable_bw: 1598 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1599 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1600 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1601 threads.append(t) 1602 1603 if target_obj.enable_dpdk_memory: 1604 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1605 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1606 threads.append(t) 1607 1608 if target_obj.enable_pm: 1609 power_daemon = threading.Thread(target=target_obj.measure_power, 1610 args=(args.results, measurements_prefix, script_full_dir, 1611 fio_ramp_time, fio_run_time)) 1612 threads.append(power_daemon) 1613 1614 if target_obj.enable_adq: 1615 ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,)) 1616 threads.append(ethtool_thread) 1617 1618 for t in threads: 1619 t.start() 1620 for t in threads: 1621 t.join() 1622 1623 for i in initiators: 1624 i.init_disconnect() 1625 i.copy_result_files(args.results) 1626 try: 1627 parse_results(args.results, args.csv_filename) 1628 except Exception: 1629 logging.error("There was an error with parsing the results") 1630 finally: 1631 for i in initiators: 1632 try: 1633 i.stop() 1634 except Exception as err: 1635 pass 1636 target_obj.stop() 1637