1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False) 57 ] 58 self.read_config(config_fields, general_config) 59 self.transport = self.transport.lower() 60 61 config_fields = [ 62 ConfigField(name='nic_ips', required=True), 63 ConfigField(name='mode', required=True), 64 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 65 ConfigField(name='enable_arfs', default=False), 66 ConfigField(name='tuned_profile', default='') 67 ] 68 self.read_config(config_fields, server_config) 69 70 self.local_nic_info = [] 71 self._nics_json_obj = {} 72 self.svc_restore_dict = {} 73 self.sysctl_restore_dict = {} 74 self.tuned_restore_dict = {} 75 self.governor_restore = "" 76 77 self.enable_adq = server_config.get('adq_enable', False) 78 self.adq_priority = 1 if self.enable_adq else None 79 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 80 81 if not re.match(r'^[A-Za-z0-9\-]+$', name): 82 self.log.info("Please use a name which contains only letters, numbers or dashes") 83 sys.exit(1) 84 85 def read_config(self, config_fields, config): 86 for config_field in config_fields: 87 value = config.get(config_field.name, config_field.default) 88 if config_field.required is True and value is None: 89 raise ValueError('%s config key is required' % config_field.name) 90 91 setattr(self, config_field.name, value) 92 93 @staticmethod 94 def get_uncommented_lines(lines): 95 return [line for line in lines if line and not line.startswith('#')] 96 97 def get_nic_name_by_ip(self, ip): 98 if not self._nics_json_obj: 99 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 100 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 101 for nic in self._nics_json_obj: 102 for addr in nic["addr_info"]: 103 if ip in addr["local"]: 104 return nic["ifname"] 105 106 @abstractmethod 107 def set_local_nic_info_helper(self): 108 pass 109 110 def set_local_nic_info(self, pci_info): 111 def extract_network_elements(json_obj): 112 nic_list = [] 113 if isinstance(json_obj, list): 114 for x in json_obj: 115 nic_list.extend(extract_network_elements(x)) 116 elif isinstance(json_obj, dict): 117 if "children" in json_obj: 118 nic_list.extend(extract_network_elements(json_obj["children"])) 119 if "class" in json_obj.keys() and "network" in json_obj["class"]: 120 nic_list.append(json_obj) 121 return nic_list 122 123 self.local_nic_info = extract_network_elements(pci_info) 124 125 def get_nic_numa_node(self, nic_name): 126 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 127 128 def get_numa_cpu_map(self): 129 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 130 numa_cpu_json_map = {} 131 132 for cpu in numa_cpu_json_obj["cpus"]: 133 cpu_num = int(cpu["cpu"]) 134 numa_node = int(cpu["node"]) 135 numa_cpu_json_map.setdefault(numa_node, []) 136 numa_cpu_json_map[numa_node].append(cpu_num) 137 138 return numa_cpu_json_map 139 140 # pylint: disable=R0201 141 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 142 return "" 143 144 def configure_system(self): 145 self.load_drivers() 146 self.configure_services() 147 self.configure_sysctl() 148 self.configure_arfs() 149 self.configure_tuned() 150 self.configure_cpu_governor() 151 self.configure_irq_affinity(**self.irq_settings) 152 153 RDMA_PROTOCOL_IWARP = 0 154 RDMA_PROTOCOL_ROCE = 1 155 RDMA_PROTOCOL_UNKNOWN = -1 156 157 def check_rdma_protocol(self): 158 try: 159 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 160 roce_ena = roce_ena.strip() 161 if roce_ena == "0": 162 return self.RDMA_PROTOCOL_IWARP 163 else: 164 return self.RDMA_PROTOCOL_ROCE 165 except CalledProcessError as e: 166 self.log.error("ERROR: failed to check RDMA protocol!") 167 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 168 return self.RDMA_PROTOCOL_UNKNOWN 169 170 def load_drivers(self): 171 self.log.info("Loading drivers") 172 self.exec_cmd(["sudo", "modprobe", "-a", 173 "nvme-%s" % self.transport, 174 "nvmet-%s" % self.transport]) 175 current_mode = self.check_rdma_protocol() 176 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 177 self.log.error("ERROR: failed to check RDMA protocol mode") 178 return 179 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 180 self.reload_driver("irdma", "roce_ena=1") 181 self.log.info("Loaded irdma driver with RoCE enabled") 182 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 183 self.log.info("Leaving irdma driver with RoCE enabled") 184 else: 185 self.reload_driver("irdma", "roce_ena=0") 186 self.log.info("Loaded irdma driver with iWARP enabled") 187 188 def configure_arfs(self): 189 rps_flow_cnt = 512 190 if not self.enable_arfs: 191 rps_flow_cnt = 0 192 193 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 194 for nic_name in nic_names: 195 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 196 self.log.info(f"Setting rps_flow_cnt for {nic_name}") 197 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 198 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 199 200 for qf in queue_files: 201 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 202 203 def configure_adq(self): 204 self.adq_load_modules() 205 self.adq_configure_nic() 206 207 def adq_load_modules(self): 208 self.log.info("Modprobing ADQ-related Linux modules...") 209 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 210 for module in adq_module_deps: 211 try: 212 self.exec_cmd(["sudo", "modprobe", module]) 213 self.log.info("%s loaded!" % module) 214 except CalledProcessError as e: 215 self.log.error("ERROR: failed to load module %s" % module) 216 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 217 218 def adq_configure_tc(self): 219 self.log.info("Configuring ADQ Traffic classes and filters...") 220 221 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 222 num_queues_tc1 = self.num_cores 223 port_param = "dst_port" if isinstance(self, Target) else "src_port" 224 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 225 226 for nic_ip in self.nic_ips: 227 nic_name = self.get_nic_name_by_ip(nic_ip) 228 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 229 230 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 231 "root", "mqprio", "num_tc", "2", "map", "0", "1", 232 "queues", "%s@0" % num_queues_tc0, 233 "%s@%s" % (num_queues_tc1, num_queues_tc0), 234 "hw", "1", "mode", "channel"] 235 self.log.info(" ".join(tc_qdisc_map_cmd)) 236 self.exec_cmd(tc_qdisc_map_cmd) 237 238 time.sleep(5) 239 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 240 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 241 self.exec_cmd(tc_qdisc_ingress_cmd) 242 243 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 244 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 245 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 246 247 for port in nic_ports: 248 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 249 "protocol", "ip", "ingress", "prio", "1", "flower", 250 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 251 "skip_sw", "hw_tc", "1"] 252 self.log.info(" ".join(tc_filter_cmd)) 253 self.exec_cmd(tc_filter_cmd) 254 255 # show tc configuration 256 self.log.info("Show tc configuration for %s NIC..." % nic_name) 257 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 258 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 259 self.log.info("%s" % tc_disk_out) 260 self.log.info("%s" % tc_filter_out) 261 262 # Ethtool coalesce settings must be applied after configuring traffic classes 263 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 264 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 265 266 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 267 xps_cmd = ["sudo", xps_script_path, nic_name] 268 self.log.info(xps_cmd) 269 self.exec_cmd(xps_cmd) 270 271 def reload_driver(self, driver, *modprobe_args): 272 273 try: 274 self.exec_cmd(["sudo", "rmmod", driver]) 275 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 276 except CalledProcessError as e: 277 self.log.error("ERROR: failed to reload %s module!" % driver) 278 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 279 280 def adq_configure_nic(self): 281 self.log.info("Configuring NIC port settings for ADQ testing...") 282 283 # Reload the driver first, to make sure any previous settings are re-set. 284 self.reload_driver("ice") 285 286 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 287 for nic in nic_names: 288 self.log.info(nic) 289 try: 290 self.exec_cmd(["sudo", "ethtool", "-K", nic, 291 "hw-tc-offload", "on"]) # Enable hardware TC offload 292 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 293 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 294 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 295 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 296 # but can be used with 4k block sizes as well 297 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 298 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 299 except subprocess.CalledProcessError as e: 300 self.log.error("ERROR: failed to configure NIC port using ethtool!") 301 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 302 self.log.info("Please update your NIC driver and firmware versions and try again.") 303 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 304 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 305 306 def configure_services(self): 307 self.log.info("Configuring active services...") 308 svc_config = configparser.ConfigParser(strict=False) 309 310 # Below list is valid only for RHEL / Fedora systems and might not 311 # contain valid names for other distributions. 312 svc_target_state = { 313 "firewalld": "inactive", 314 "irqbalance": "inactive", 315 "lldpad.service": "inactive", 316 "lldpad.socket": "inactive" 317 } 318 319 for service in svc_target_state: 320 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 321 out = "\n".join(["[%s]" % service, out]) 322 svc_config.read_string(out) 323 324 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 325 continue 326 327 service_state = svc_config[service]["ActiveState"] 328 self.log.info("Current state of %s service is %s" % (service, service_state)) 329 self.svc_restore_dict.update({service: service_state}) 330 if service_state != "inactive": 331 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 332 self.exec_cmd(["sudo", "systemctl", "stop", service]) 333 334 def configure_sysctl(self): 335 self.log.info("Tuning sysctl settings...") 336 337 busy_read = 0 338 busy_poll = 0 339 if self.enable_adq and self.mode == "spdk": 340 busy_read = 1 341 busy_poll = 1 342 343 sysctl_opts = { 344 "net.core.busy_poll": busy_poll, 345 "net.core.busy_read": busy_read, 346 "net.core.somaxconn": 4096, 347 "net.core.netdev_max_backlog": 8192, 348 "net.ipv4.tcp_max_syn_backlog": 16384, 349 "net.core.rmem_max": 268435456, 350 "net.core.wmem_max": 268435456, 351 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 352 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 353 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 354 "net.ipv4.route.flush": 1, 355 "vm.overcommit_memory": 1, 356 "net.core.rps_sock_flow_entries": 0 357 } 358 359 if self.enable_adq: 360 sysctl_opts.update(self.adq_set_busy_read(1)) 361 362 if self.enable_arfs: 363 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 364 365 for opt, value in sysctl_opts.items(): 366 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 367 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 368 369 def configure_tuned(self): 370 if not self.tuned_profile: 371 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 372 return 373 374 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 375 service = "tuned" 376 tuned_config = configparser.ConfigParser(strict=False) 377 378 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 379 out = "\n".join(["[%s]" % service, out]) 380 tuned_config.read_string(out) 381 tuned_state = tuned_config[service]["ActiveState"] 382 self.svc_restore_dict.update({service: tuned_state}) 383 384 if tuned_state != "inactive": 385 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 386 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 387 388 self.tuned_restore_dict = { 389 "profile": profile, 390 "mode": profile_mode 391 } 392 393 self.exec_cmd(["sudo", "systemctl", "start", service]) 394 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 395 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 396 397 def configure_cpu_governor(self): 398 self.log.info("Setting CPU governor to performance...") 399 400 # This assumes that there is the same CPU scaling governor on each CPU 401 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 402 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 403 404 def get_core_list_from_mask(self, core_mask): 405 # Generate list of individual cores from hex core mask 406 # (e.g. '0xffff') or list containing: 407 # - individual cores (e.g. '1, 2, 3') 408 # - core ranges (e.g. '0-3') 409 # - mix of both (e.g. '0, 1-4, 9, 11-13') 410 411 core_list = [] 412 if "0x" in core_mask: 413 core_mask_int = int(core_mask, 16) 414 for i in range(core_mask_int.bit_length()): 415 if (1 << i) & core_mask_int: 416 core_list.append(i) 417 return core_list 418 else: 419 # Core list can be provided in .json config with square brackets 420 # remove them first 421 core_mask = core_mask.replace("[", "") 422 core_mask = core_mask.replace("]", "") 423 424 for i in core_mask.split(","): 425 if "-" in i: 426 start, end = i.split("-") 427 core_range = range(int(start), int(end) + 1) 428 core_list.extend(core_range) 429 else: 430 core_list.append(int(i)) 431 return core_list 432 433 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 434 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 435 436 if mode not in ["default", "bynode", "cpulist"]: 437 raise ValueError("%s irq affinity setting not supported" % mode) 438 439 if mode == "cpulist" and not cpulist: 440 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 441 442 affinity_script = "set_irq_affinity.sh" 443 if "default" not in mode: 444 affinity_script = "set_irq_affinity_cpulist.sh" 445 system_cpu_map = self.get_numa_cpu_map() 446 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 447 448 def cpu_list_to_string(cpulist): 449 return ",".join(map(lambda x: str(x), cpulist)) 450 451 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 452 for nic_name in nic_names: 453 irq_cmd = ["sudo", irq_script_path] 454 455 # Use only CPU cores matching NIC NUMA node. 456 # Remove any CPU cores if they're on exclusion list. 457 if mode == "bynode": 458 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 459 if cpulist and exclude_cpulist: 460 disallowed_cpus = self.get_core_list_from_mask(cpulist) 461 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 462 if not irq_cpus: 463 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 464 irq_cmd.append(cpu_list_to_string(irq_cpus)) 465 466 if mode == "cpulist": 467 irq_cpus = self.get_core_list_from_mask(cpulist) 468 if exclude_cpulist: 469 # Flatten system CPU list, we don't need NUMA awareness here 470 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 471 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 472 if not irq_cpus: 473 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 474 irq_cmd.append(cpu_list_to_string(irq_cpus)) 475 476 irq_cmd.append(nic_name) 477 self.log.info(irq_cmd) 478 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 479 480 def restore_services(self): 481 self.log.info("Restoring services...") 482 for service, state in self.svc_restore_dict.items(): 483 cmd = "stop" if state == "inactive" else "start" 484 self.exec_cmd(["sudo", "systemctl", cmd, service]) 485 486 def restore_sysctl(self): 487 self.log.info("Restoring sysctl settings...") 488 for opt, value in self.sysctl_restore_dict.items(): 489 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 490 491 def restore_tuned(self): 492 self.log.info("Restoring tuned-adm settings...") 493 494 if not self.tuned_restore_dict: 495 return 496 497 if self.tuned_restore_dict["mode"] == "auto": 498 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 499 self.log.info("Reverted tuned-adm to auto_profile.") 500 else: 501 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 502 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 503 504 def restore_governor(self): 505 self.log.info("Restoring CPU governor setting...") 506 if self.governor_restore: 507 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 508 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 509 510 def restore_settings(self): 511 self.restore_governor() 512 self.restore_tuned() 513 self.restore_services() 514 self.restore_sysctl() 515 if self.enable_adq: 516 self.reload_driver("ice") 517 518 519class Target(Server): 520 def __init__(self, name, general_config, target_config): 521 super().__init__(name, general_config, target_config) 522 523 # Defaults 524 self.enable_zcopy = False 525 self.scheduler_name = "static" 526 self.null_block = 0 527 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 528 self.subsystem_info_list = [] 529 self.initiator_info = [] 530 self.nvme_allowlist = [] 531 self.nvme_blocklist = [] 532 533 # Target-side measurement options 534 self.enable_pm = True 535 self.enable_sar = True 536 self.enable_pcm = True 537 self.enable_bw = True 538 self.enable_dpdk_memory = True 539 540 if "null_block_devices" in target_config: 541 self.null_block = target_config["null_block_devices"] 542 if "scheduler_settings" in target_config: 543 self.scheduler_name = target_config["scheduler_settings"] 544 if "zcopy_settings" in target_config: 545 self.enable_zcopy = target_config["zcopy_settings"] 546 if "results_dir" in target_config: 547 self.results_dir = target_config["results_dir"] 548 if "blocklist" in target_config: 549 self.nvme_blocklist = target_config["blocklist"] 550 if "allowlist" in target_config: 551 self.nvme_allowlist = target_config["allowlist"] 552 # Blocklist takes precedence, remove common elements from allowlist 553 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 554 if "enable_pm" in target_config: 555 self.enable_pm = target_config["enable_pm"] 556 if "enable_sar" in target_config: 557 self.enable_sar = target_config["enable_sar"] 558 if "enable_pcm" in target_config: 559 self.enable_pcm = target_config["enable_pcm"] 560 if "enable_bandwidth" in target_config: 561 self.enable_bw = target_config["enable_bandwidth"] 562 if "enable_dpdk_memory" in target_config: 563 self.enable_dpdk_memory = target_config["enable_dpdk_memory"] 564 565 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 566 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 567 568 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 569 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 570 self.set_local_nic_info(self.set_local_nic_info_helper()) 571 572 if self.skip_spdk_install is False: 573 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 574 575 self.configure_system() 576 if self.enable_adq: 577 self.configure_adq() 578 self.configure_irq_affinity() 579 self.sys_config() 580 581 def set_local_nic_info_helper(self): 582 return json.loads(self.exec_cmd(["lshw", "-json"])) 583 584 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 585 stderr_opt = None 586 if stderr_redirect: 587 stderr_opt = subprocess.STDOUT 588 if change_dir: 589 old_cwd = os.getcwd() 590 os.chdir(change_dir) 591 self.log.info("Changing directory to %s" % change_dir) 592 593 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 594 595 if change_dir: 596 os.chdir(old_cwd) 597 self.log.info("Changing directory to %s" % old_cwd) 598 return out 599 600 def zip_spdk_sources(self, spdk_dir, dest_file): 601 self.log.info("Zipping SPDK source directory") 602 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 603 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 604 for file in files: 605 fh.write(os.path.relpath(os.path.join(root, file))) 606 fh.close() 607 self.log.info("Done zipping") 608 609 @staticmethod 610 def _chunks(input_list, chunks_no): 611 div, rem = divmod(len(input_list), chunks_no) 612 for i in range(chunks_no): 613 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 614 yield input_list[si:si + (div + 1 if i < rem else div)] 615 616 def spread_bdevs(self, req_disks): 617 # Spread available block devices indexes: 618 # - evenly across available initiator systems 619 # - evenly across available NIC interfaces for 620 # each initiator 621 # Not NUMA aware. 622 ip_bdev_map = [] 623 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 624 625 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 626 self.initiator_info[i]["bdev_range"] = init_chunk 627 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 628 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 629 for c in nic_chunk: 630 ip_bdev_map.append((ip, c)) 631 return ip_bdev_map 632 633 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 634 cpu_number = os.cpu_count() 635 sar_idle_sum = 0 636 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 637 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 638 639 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 640 time.sleep(ramp_time) 641 self.log.info("Starting SAR measurements") 642 643 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 644 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 645 for line in out.split("\n"): 646 if "Average" in line: 647 if "CPU" in line: 648 self.log.info("Summary CPU utilization from SAR:") 649 self.log.info(line) 650 elif "all" in line: 651 self.log.info(line) 652 else: 653 sar_idle_sum += float(line.split()[7]) 654 fh.write(out) 655 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 656 657 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 658 f.write("%0.2f" % sar_cpu_usage) 659 660 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 661 time.sleep(ramp_time) 662 self.log.info("Starting power measurements") 663 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 664 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 665 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 666 667 def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time): 668 time.sleep(ramp_time) 669 cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)] 670 pcm_memory = subprocess.Popen(cmd) 671 time.sleep(run_time) 672 pcm_memory.terminate() 673 674 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 675 time.sleep(ramp_time) 676 cmd = ["pcm", "1", "-i=%s" % run_time, 677 "-csv=%s/%s" % (results_dir, pcm_file_name)] 678 subprocess.run(cmd) 679 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 680 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 681 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 682 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 683 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 684 685 def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time): 686 time.sleep(ramp_time) 687 out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time]) 688 with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh: 689 fh.write(out) 690 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 691 # Improve this so that additional file containing measurements average is generated too. 692 693 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 694 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 695 time.sleep(ramp_time) 696 self.log.info("INFO: starting network bandwidth measure") 697 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 698 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 699 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 700 # Improve this so that additional file containing measurements average is generated too. 701 # TODO: Monitor only these interfaces which are currently used to run the workload. 702 703 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 704 self.log.info("INFO: waiting to generate DPDK memory usage") 705 time.sleep(ramp_time) 706 self.log.info("INFO: generating DPDK memory usage") 707 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 708 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 709 710 def sys_config(self): 711 self.log.info("====Kernel release:====") 712 self.log.info(os.uname().release) 713 self.log.info("====Kernel command line:====") 714 with open('/proc/cmdline') as f: 715 cmdline = f.readlines() 716 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 717 self.log.info("====sysctl conf:====") 718 with open('/etc/sysctl.conf') as f: 719 sysctl = f.readlines() 720 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 721 self.log.info("====Cpu power info:====") 722 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 723 self.log.info("====zcopy settings:====") 724 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 725 self.log.info("====Scheduler settings:====") 726 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 727 728 729class Initiator(Server): 730 def __init__(self, name, general_config, initiator_config): 731 super().__init__(name, general_config, initiator_config) 732 733 # Required fields, Defaults 734 config_fields = [ 735 ConfigField(name='ip', required=True), 736 ConfigField(name='target_nic_ips', required=True), 737 ConfigField(name='cpus_allowed', default=None), 738 ConfigField(name='cpus_allowed_policy', default='shared'), 739 ConfigField(name='spdk_dir', default='/tmp/spdk'), 740 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 741 ConfigField(name='nvmecli_bin', default='nvme'), 742 ConfigField(name='cpu_frequency', default=None), 743 ConfigField(name='allow_cpu_sharing', default=True) 744 ] 745 746 self.read_config(config_fields, initiator_config) 747 748 if os.getenv('SPDK_WORKSPACE'): 749 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 750 751 self.subsystem_info_list = [] 752 753 self.ssh_connection = paramiko.SSHClient() 754 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 755 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 756 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 757 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 758 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 759 760 if self.skip_spdk_install is False: 761 self.copy_spdk("/tmp/spdk.zip") 762 763 self.set_local_nic_info(self.set_local_nic_info_helper()) 764 self.set_cpu_frequency() 765 self.configure_system() 766 if self.enable_adq: 767 self.configure_adq() 768 self.sys_config() 769 770 def set_local_nic_info_helper(self): 771 return json.loads(self.exec_cmd(["lshw", "-json"])) 772 773 def stop(self): 774 self.restore_settings() 775 self.ssh_connection.close() 776 777 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 778 if change_dir: 779 cmd = ["cd", change_dir, ";", *cmd] 780 781 # In case one of the command elements contains whitespace and is not 782 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 783 # when sending to remote system. 784 for i, c in enumerate(cmd): 785 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 786 cmd[i] = '"%s"' % c 787 cmd = " ".join(cmd) 788 789 # Redirect stderr to stdout thanks using get_pty option if needed 790 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 791 out = stdout.read().decode(encoding="utf-8") 792 793 # Check the return code 794 rc = stdout.channel.recv_exit_status() 795 if rc: 796 raise CalledProcessError(int(rc), cmd, out) 797 798 return out 799 800 def put_file(self, local, remote_dest): 801 ftp = self.ssh_connection.open_sftp() 802 ftp.put(local, remote_dest) 803 ftp.close() 804 805 def get_file(self, remote, local_dest): 806 ftp = self.ssh_connection.open_sftp() 807 ftp.get(remote, local_dest) 808 ftp.close() 809 810 def copy_spdk(self, local_spdk_zip): 811 self.log.info("Copying SPDK sources to initiator %s" % self.name) 812 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 813 self.log.info("Copied sources zip from target") 814 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 815 self.log.info("Sources unpacked") 816 817 def copy_result_files(self, dest_dir): 818 self.log.info("Copying results") 819 820 if not os.path.exists(dest_dir): 821 os.mkdir(dest_dir) 822 823 # Get list of result files from initiator and copy them back to target 824 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 825 826 for file in file_list: 827 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 828 os.path.join(dest_dir, file)) 829 self.log.info("Done copying results") 830 831 def match_subsystems(self, target_subsytems): 832 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 833 subsystems.sort(key=lambda x: x[1]) 834 self.log.info("Found matching subsystems on target side:") 835 for s in subsystems: 836 self.log.info(s) 837 self.subsystem_info_list = subsystems 838 839 @abstractmethod 840 def init_connect(self): 841 pass 842 843 @abstractmethod 844 def init_disconnect(self): 845 pass 846 847 @abstractmethod 848 def gen_fio_filename_conf(self, *args, **kwargs): 849 # Logic implemented in SPDKInitiator and KernelInitiator classes 850 pass 851 852 def get_route_nic_numa(self, remote_nvme_ip): 853 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 854 local_nvme_nic = local_nvme_nic[0]["dev"] 855 return self.get_nic_numa_node(local_nvme_nic) 856 857 @staticmethod 858 def gen_fio_offset_section(offset_inc, num_jobs): 859 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 860 return "\n".join(["size=%s%%" % offset_inc, 861 "offset=0%", 862 "offset_increment=%s%%" % offset_inc]) 863 864 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 865 numa_stats = {} 866 allowed_cpus = [] 867 for nvme in fio_filenames_list: 868 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 869 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 870 871 # Use the most common NUMA node for this chunk to allocate memory and CPUs 872 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 873 874 # Check if we have enough free CPUs to pop from the list before assigning them 875 if len(self.available_cpus[section_local_numa]) < num_jobs: 876 if self.allow_cpu_sharing: 877 self.log.info("Regenerating available CPU list %s" % section_local_numa) 878 # Remove still available CPUs from the regenerated list. We don't want to 879 # regenerate it with duplicates. 880 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 881 self.available_cpus[section_local_numa].extend(cpus_regen) 882 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 883 else: 884 self.log.error("No more free CPU cores to use from allowed_cpus list!") 885 raise IndexError 886 887 for _ in range(num_jobs): 888 try: 889 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 890 except IndexError: 891 self.log.error("No more free CPU cores to use from allowed_cpus list!") 892 raise 893 894 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 895 "numa_mem_policy=prefer:%s" % section_local_numa]) 896 897 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 898 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 899 offset=False, offset_inc=0): 900 fio_conf_template = """ 901[global] 902ioengine={ioengine} 903{spdk_conf} 904thread=1 905group_reporting=1 906direct=1 907percentile_list=50:90:99:99.5:99.9:99.99:99.999 908 909norandommap=1 910rw={rw} 911rwmixread={rwmixread} 912bs={block_size} 913time_based=1 914ramp_time={ramp_time} 915runtime={run_time} 916rate_iops={rate_iops} 917""" 918 919 if self.cpus_allowed is not None: 920 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 921 cpus_num = 0 922 cpus = self.cpus_allowed.split(",") 923 for cpu in cpus: 924 if "-" in cpu: 925 a, b = cpu.split("-") 926 a = int(a) 927 b = int(b) 928 cpus_num += len(range(a, b)) 929 else: 930 cpus_num += 1 931 self.num_cores = cpus_num 932 threads = range(0, self.num_cores) 933 elif hasattr(self, 'num_cores'): 934 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 935 threads = range(0, int(self.num_cores)) 936 else: 937 self.num_cores = len(self.subsystem_info_list) 938 threads = range(0, len(self.subsystem_info_list)) 939 940 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 941 offset, offset_inc) 942 943 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 944 rw=rw, rwmixread=rwmixread, block_size=block_size, 945 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 946 947 # TODO: hipri disabled for now, as it causes fio errors: 948 # io_u error on file /dev/nvme2n1: Operation not supported 949 # See comment in KernelInitiator class, init_connect() function 950 if "io_uring" in self.ioengine: 951 fio_config = fio_config + """ 952fixedbufs=1 953registerfiles=1 954#hipri=1 955""" 956 if num_jobs: 957 fio_config = fio_config + "numjobs=%s \n" % num_jobs 958 if self.cpus_allowed is not None: 959 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 960 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 961 fio_config = fio_config + filename_section 962 963 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 964 if hasattr(self, "num_cores"): 965 fio_config_filename += "_%sCPU" % self.num_cores 966 fio_config_filename += ".fio" 967 968 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 969 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 970 self.log.info("Created FIO Config:") 971 self.log.info(fio_config) 972 973 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 974 975 def set_cpu_frequency(self): 976 if self.cpu_frequency is not None: 977 try: 978 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 979 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 980 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 981 except Exception: 982 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 983 sys.exit() 984 else: 985 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 986 987 def run_fio(self, fio_config_file, run_num=1): 988 job_name, _ = os.path.splitext(fio_config_file) 989 self.log.info("Starting FIO run for job: %s" % job_name) 990 self.log.info("Using FIO: %s" % self.fio_bin) 991 992 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 993 try: 994 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 995 "--output=%s" % output_filename, "--eta=never"], True) 996 self.log.info(output) 997 self.log.info("FIO run finished. Results in: %s" % output_filename) 998 except subprocess.CalledProcessError as e: 999 self.log.error("ERROR: Fio process failed!") 1000 self.log.error(e.stdout) 1001 1002 def sys_config(self): 1003 self.log.info("====Kernel release:====") 1004 self.log.info(self.exec_cmd(["uname", "-r"])) 1005 self.log.info("====Kernel command line:====") 1006 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1007 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1008 self.log.info("====sysctl conf:====") 1009 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 1010 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1011 self.log.info("====Cpu power info:====") 1012 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 1013 1014 1015class KernelTarget(Target): 1016 def __init__(self, name, general_config, target_config): 1017 super().__init__(name, general_config, target_config) 1018 # Defaults 1019 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1020 1021 def load_drivers(self): 1022 self.log.info("Loading drivers") 1023 super().load_drivers() 1024 if self.null_block: 1025 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1026 1027 def configure_adq(self): 1028 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1029 1030 def adq_configure_tc(self): 1031 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1032 1033 def adq_set_busy_read(self, busy_read_val): 1034 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1035 return {"net.core.busy_read": 0} 1036 1037 def stop(self): 1038 self.nvmet_command(self.nvmet_bin, "clear") 1039 self.restore_settings() 1040 1041 def get_nvme_device_bdf(self, nvme_dev_path): 1042 nvme_name = os.path.basename(nvme_dev_path) 1043 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1044 1045 def get_nvme_devices(self): 1046 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1047 nvme_list = [] 1048 for dev in dev_list: 1049 if "nvme" not in dev: 1050 continue 1051 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1052 continue 1053 if len(self.nvme_allowlist) == 0: 1054 nvme_list.append(dev) 1055 continue 1056 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1057 nvme_list.append(dev) 1058 return nvme_list 1059 1060 def nvmet_command(self, nvmet_bin, command): 1061 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1062 1063 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1064 1065 nvmet_cfg = { 1066 "ports": [], 1067 "hosts": [], 1068 "subsystems": [], 1069 } 1070 1071 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1072 port = str(4420 + bdev_num) 1073 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1074 serial = "SPDK00%s" % bdev_num 1075 bdev_name = nvme_list[bdev_num] 1076 1077 nvmet_cfg["subsystems"].append({ 1078 "allowed_hosts": [], 1079 "attr": { 1080 "allow_any_host": "1", 1081 "serial": serial, 1082 "version": "1.3" 1083 }, 1084 "namespaces": [ 1085 { 1086 "device": { 1087 "path": bdev_name, 1088 "uuid": "%s" % uuid.uuid4() 1089 }, 1090 "enable": 1, 1091 "nsid": port 1092 } 1093 ], 1094 "nqn": nqn 1095 }) 1096 1097 nvmet_cfg["ports"].append({ 1098 "addr": { 1099 "adrfam": "ipv4", 1100 "traddr": ip, 1101 "trsvcid": port, 1102 "trtype": self.transport 1103 }, 1104 "portid": bdev_num, 1105 "referrals": [], 1106 "subsystems": [nqn] 1107 }) 1108 1109 self.subsystem_info_list.append((port, nqn, ip)) 1110 self.subsys_no = len(self.subsystem_info_list) 1111 1112 with open("kernel.conf", "w") as fh: 1113 fh.write(json.dumps(nvmet_cfg, indent=2)) 1114 1115 def tgt_start(self): 1116 self.log.info("Configuring kernel NVMeOF Target") 1117 1118 if self.null_block: 1119 self.log.info("Configuring with null block device.") 1120 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1121 else: 1122 self.log.info("Configuring with NVMe drives.") 1123 nvme_list = self.get_nvme_devices() 1124 1125 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1126 self.subsys_no = len(nvme_list) 1127 1128 self.nvmet_command(self.nvmet_bin, "clear") 1129 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1130 1131 if self.enable_adq: 1132 self.adq_configure_tc() 1133 1134 self.log.info("Done configuring kernel NVMeOF Target") 1135 1136 1137class SPDKTarget(Target): 1138 def __init__(self, name, general_config, target_config): 1139 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1140 # Method setting IRQ affinity is run as part of parent classes init, 1141 # so we need to have self.core_mask set before changing IRQ affinity. 1142 self.core_mask = target_config["core_mask"] 1143 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1144 1145 super().__init__(name, general_config, target_config) 1146 1147 # Format: property, default value 1148 config_fields = [ 1149 ConfigField(name='dif_insert_strip', default=False), 1150 ConfigField(name='null_block_dif_type', default=0), 1151 ConfigField(name='num_shared_buffers', default=4096), 1152 ConfigField(name='max_queue_depth', default=128), 1153 ConfigField(name='bpf_scripts', default=[]), 1154 ConfigField(name='scheduler_core_limit', default=None), 1155 ConfigField(name='dsa_settings', default=False), 1156 ConfigField(name='iobuf_small_pool_count', default=32767), 1157 ConfigField(name='iobuf_large_pool_count', default=16383), 1158 ConfigField(name='num_cqe', default=4096) 1159 ] 1160 1161 self.read_config(config_fields, target_config) 1162 1163 self.bpf_proc = None 1164 self.enable_dsa = False 1165 1166 self.log.info("====DSA settings:====") 1167 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1168 1169 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1170 if mode not in ["default", "bynode", "cpulist", 1171 "shared", "split", "split-bynode"]: 1172 self.log.error("%s irq affinity setting not supported" % mode) 1173 raise Exception 1174 1175 # Create core list from SPDK's mask and change it to string. 1176 # This is the type configure_irq_affinity expects for cpulist parameter. 1177 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1178 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1179 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1180 1181 if mode == "shared": 1182 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1183 elif mode == "split": 1184 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1185 elif mode == "split-bynode": 1186 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1187 else: 1188 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1189 1190 def adq_set_busy_read(self, busy_read_val): 1191 return {"net.core.busy_read": busy_read_val} 1192 1193 def get_nvme_devices_count(self): 1194 return len(self.get_nvme_devices()) 1195 1196 def get_nvme_devices(self): 1197 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1198 bdev_bdfs = [] 1199 for bdev in bdev_subsys_json_obj["config"]: 1200 bdev_traddr = bdev["params"]["traddr"] 1201 if bdev_traddr in self.nvme_blocklist: 1202 continue 1203 if len(self.nvme_allowlist) == 0: 1204 bdev_bdfs.append(bdev_traddr) 1205 if bdev_traddr in self.nvme_allowlist: 1206 bdev_bdfs.append(bdev_traddr) 1207 return bdev_bdfs 1208 1209 def spdk_tgt_configure(self): 1210 self.log.info("Configuring SPDK NVMeOF target via RPC") 1211 1212 # Create transport layer 1213 nvmf_transport_params = { 1214 "client": self.client, 1215 "trtype": self.transport, 1216 "num_shared_buffers": self.num_shared_buffers, 1217 "max_queue_depth": self.max_queue_depth, 1218 "dif_insert_or_strip": self.dif_insert_strip, 1219 "sock_priority": self.adq_priority, 1220 "num_cqe": self.num_cqe 1221 } 1222 1223 if self.enable_adq: 1224 nvmf_transport_params["acceptor_poll_rate"] = 10000 1225 1226 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1227 self.log.info("SPDK NVMeOF transport layer:") 1228 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1229 1230 if self.null_block: 1231 self.spdk_tgt_add_nullblock(self.null_block) 1232 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1233 else: 1234 self.spdk_tgt_add_nvme_conf() 1235 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1236 1237 if self.enable_adq: 1238 self.adq_configure_tc() 1239 1240 self.log.info("Done configuring SPDK NVMeOF Target") 1241 1242 def spdk_tgt_add_nullblock(self, null_block_count): 1243 md_size = 0 1244 block_size = 4096 1245 if self.null_block_dif_type != 0: 1246 md_size = 128 1247 1248 self.log.info("Adding null block bdevices to config via RPC") 1249 for i in range(null_block_count): 1250 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1251 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1252 dif_type=self.null_block_dif_type, md_size=md_size) 1253 self.log.info("SPDK Bdevs configuration:") 1254 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1255 1256 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1257 self.log.info("Adding NVMe bdevs to config via RPC") 1258 1259 bdfs = self.get_nvme_devices() 1260 bdfs = [b.replace(":", ".") for b in bdfs] 1261 1262 if req_num_disks: 1263 if req_num_disks > len(bdfs): 1264 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1265 sys.exit(1) 1266 else: 1267 bdfs = bdfs[0:req_num_disks] 1268 1269 for i, bdf in enumerate(bdfs): 1270 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1271 1272 self.log.info("SPDK Bdevs configuration:") 1273 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1274 1275 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1276 self.log.info("Adding subsystems to config") 1277 if not req_num_disks: 1278 req_num_disks = self.get_nvme_devices_count() 1279 1280 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1281 port = str(4420 + bdev_num) 1282 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1283 serial = "SPDK00%s" % bdev_num 1284 bdev_name = "Nvme%sn1" % bdev_num 1285 1286 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1287 allow_any_host=True, max_namespaces=8) 1288 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1289 for nqn_name in [nqn, "discovery"]: 1290 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1291 nqn=nqn_name, 1292 trtype=self.transport, 1293 traddr=ip, 1294 trsvcid=port, 1295 adrfam="ipv4") 1296 self.subsystem_info_list.append((port, nqn, ip)) 1297 self.subsys_no = len(self.subsystem_info_list) 1298 1299 self.log.info("SPDK NVMeOF subsystem configuration:") 1300 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1301 1302 def bpf_start(self): 1303 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1304 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1305 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1306 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1307 1308 with open(self.pid, "r") as fh: 1309 nvmf_pid = str(fh.readline()) 1310 1311 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1312 self.log.info(cmd) 1313 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1314 1315 def tgt_start(self): 1316 if self.null_block: 1317 self.subsys_no = 1 1318 else: 1319 self.subsys_no = self.get_nvme_devices_count() 1320 self.log.info("Starting SPDK NVMeOF Target process") 1321 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1322 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1323 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1324 1325 with open(self.pid, "w") as fh: 1326 fh.write(str(proc.pid)) 1327 self.nvmf_proc = proc 1328 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1329 self.log.info("Waiting for spdk to initialize...") 1330 while True: 1331 if os.path.exists("/var/tmp/spdk.sock"): 1332 break 1333 time.sleep(1) 1334 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1335 1336 rpc.sock.sock_set_default_impl(self.client, impl_name="posix") 1337 rpc.iobuf.iobuf_set_options(self.client, 1338 small_pool_count=self.iobuf_small_pool_count, 1339 large_pool_count=self.iobuf_large_pool_count, 1340 small_bufsize=None, 1341 large_bufsize=None) 1342 1343 if self.enable_zcopy: 1344 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", 1345 enable_zerocopy_send_server=True) 1346 self.log.info("Target socket options:") 1347 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix")) 1348 1349 if self.enable_adq: 1350 rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1) 1351 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1352 nvme_adminq_poll_period_us=100000, retry_count=4) 1353 1354 if self.enable_dsa: 1355 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1356 self.log.info("Target DSA accel module enabled") 1357 1358 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1359 rpc.framework_start_init(self.client) 1360 1361 if self.bpf_scripts: 1362 self.bpf_start() 1363 1364 self.spdk_tgt_configure() 1365 1366 def stop(self): 1367 if self.bpf_proc: 1368 self.log.info("Stopping BPF Trace script") 1369 self.bpf_proc.terminate() 1370 self.bpf_proc.wait() 1371 1372 if hasattr(self, "nvmf_proc"): 1373 try: 1374 self.nvmf_proc.terminate() 1375 self.nvmf_proc.wait(timeout=30) 1376 except Exception as e: 1377 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1378 self.log.info(e) 1379 self.nvmf_proc.kill() 1380 self.nvmf_proc.communicate() 1381 # Try to clean up RPC socket files if they were not removed 1382 # because of using 'kill' 1383 try: 1384 os.remove("/var/tmp/spdk.sock") 1385 os.remove("/var/tmp/spdk.sock.lock") 1386 except FileNotFoundError: 1387 pass 1388 self.restore_settings() 1389 1390 1391class KernelInitiator(Initiator): 1392 def __init__(self, name, general_config, initiator_config): 1393 super().__init__(name, general_config, initiator_config) 1394 1395 # Defaults 1396 self.extra_params = initiator_config.get('extra_params', '') 1397 1398 self.ioengine = "libaio" 1399 self.spdk_conf = "" 1400 1401 if "num_cores" in initiator_config: 1402 self.num_cores = initiator_config["num_cores"] 1403 1404 if "kernel_engine" in initiator_config: 1405 self.ioengine = initiator_config["kernel_engine"] 1406 if "io_uring" in self.ioengine: 1407 self.extra_params += ' --nr-poll-queues=8' 1408 1409 def configure_adq(self): 1410 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1411 1412 def adq_configure_tc(self): 1413 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1414 1415 def adq_set_busy_read(self, busy_read_val): 1416 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1417 return {"net.core.busy_read": 0} 1418 1419 def get_connected_nvme_list(self): 1420 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1421 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1422 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1423 return nvme_list 1424 1425 def init_connect(self): 1426 self.log.info("Below connection attempts may result in error messages, this is expected!") 1427 for subsystem in self.subsystem_info_list: 1428 self.log.info("Trying to connect %s %s %s" % subsystem) 1429 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1430 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1431 time.sleep(2) 1432 1433 if "io_uring" in self.ioengine: 1434 self.log.info("Setting block layer settings for io_uring.") 1435 1436 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1437 # apparently it's not possible for connected subsystems. 1438 # Results in "error: Invalid argument" 1439 block_sysfs_settings = { 1440 "iostats": "0", 1441 "rq_affinity": "0", 1442 "nomerges": "2" 1443 } 1444 1445 for disk in self.get_connected_nvme_list(): 1446 sysfs = os.path.join("/sys/block", disk, "queue") 1447 for k, v in block_sysfs_settings.items(): 1448 sysfs_opt_path = os.path.join(sysfs, k) 1449 try: 1450 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1451 except CalledProcessError as e: 1452 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1453 finally: 1454 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1455 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1456 1457 def init_disconnect(self): 1458 for subsystem in self.subsystem_info_list: 1459 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1460 time.sleep(1) 1461 1462 def get_nvme_subsystem_numa(self, dev_name): 1463 # Remove two last characters to get controller name instead of subsystem name 1464 nvme_ctrl = os.path.basename(dev_name)[:-2] 1465 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1466 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1467 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1468 1469 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1470 self.available_cpus = self.get_numa_cpu_map() 1471 if len(threads) >= len(subsystems): 1472 threads = range(0, len(subsystems)) 1473 1474 # Generate connected nvme devices names and sort them by used NIC numa node 1475 # to allow better grouping when splitting into fio sections. 1476 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1477 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1478 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1479 1480 filename_section = "" 1481 nvme_per_split = int(len(nvme_list) / len(threads)) 1482 remainder = len(nvme_list) % len(threads) 1483 iterator = iter(nvme_list) 1484 result = [] 1485 for i in range(len(threads)): 1486 result.append([]) 1487 for _ in range(nvme_per_split): 1488 result[i].append(next(iterator)) 1489 if remainder: 1490 result[i].append(next(iterator)) 1491 remainder -= 1 1492 for i, r in enumerate(result): 1493 header = "[filename%s]" % i 1494 disks = "\n".join(["filename=%s" % x for x in r]) 1495 job_section_qd = round((io_depth * len(r)) / num_jobs) 1496 if job_section_qd == 0: 1497 job_section_qd = 1 1498 iodepth = "iodepth=%s" % job_section_qd 1499 1500 offset_section = "" 1501 if offset: 1502 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1503 1504 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1505 1506 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1507 1508 return filename_section 1509 1510 1511class SPDKInitiator(Initiator): 1512 def __init__(self, name, general_config, initiator_config): 1513 super().__init__(name, general_config, initiator_config) 1514 1515 if self.skip_spdk_install is False: 1516 self.install_spdk() 1517 1518 # Optional fields 1519 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1520 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1521 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1522 1523 if "num_cores" in initiator_config: 1524 self.num_cores = initiator_config["num_cores"] 1525 1526 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1527 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1528 1529 def adq_set_busy_read(self, busy_read_val): 1530 return {"net.core.busy_read": busy_read_val} 1531 1532 def install_spdk(self): 1533 self.log.info("Using fio binary %s" % self.fio_bin) 1534 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1535 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1536 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1537 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1538 "--enable-lto", "--disable-unit-tests"]) 1539 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1540 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1541 1542 self.log.info("SPDK built") 1543 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1544 1545 def init_connect(self): 1546 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1547 # bdev plugin initiates connection just before starting IO traffic. 1548 # This is just to have a "init_connect" equivalent of the same function 1549 # from KernelInitiator class. 1550 # Just prepare bdev.conf JSON file for later use and consider it 1551 # "making a connection". 1552 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1553 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1554 1555 def init_disconnect(self): 1556 # SPDK Initiator does not need to explicity disconnect as this gets done 1557 # after fio bdev plugin finishes IO. 1558 return 1559 1560 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1561 spdk_cfg_section = { 1562 "subsystems": [ 1563 { 1564 "subsystem": "bdev", 1565 "config": [] 1566 }, 1567 { 1568 "subsystem": "iobuf", 1569 "config": [ 1570 { 1571 "method": "iobuf_set_options", 1572 "params": { 1573 "small_pool_count": self.small_pool_count, 1574 "large_pool_count": self.large_pool_count 1575 } 1576 } 1577 ] 1578 } 1579 ] 1580 } 1581 1582 for i, subsys in enumerate(remote_subsystem_list): 1583 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1584 nvme_ctrl = { 1585 "method": "bdev_nvme_attach_controller", 1586 "params": { 1587 "name": "Nvme{}".format(i), 1588 "trtype": self.transport, 1589 "traddr": sub_addr, 1590 "trsvcid": sub_port, 1591 "subnqn": sub_nqn, 1592 "adrfam": "IPv4" 1593 } 1594 } 1595 1596 if self.enable_adq: 1597 nvme_ctrl["params"].update({"priority": "1"}) 1598 1599 if self.enable_data_digest: 1600 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1601 1602 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1603 1604 return json.dumps(spdk_cfg_section, indent=2) 1605 1606 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1607 self.available_cpus = self.get_numa_cpu_map() 1608 filename_section = "" 1609 if len(threads) >= len(subsystems): 1610 threads = range(0, len(subsystems)) 1611 1612 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1613 # to allow better grouping when splitting into fio sections. 1614 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1615 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1616 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1617 1618 nvme_per_split = int(len(subsystems) / len(threads)) 1619 remainder = len(subsystems) % len(threads) 1620 iterator = iter(filenames) 1621 result = [] 1622 for i in range(len(threads)): 1623 result.append([]) 1624 for _ in range(nvme_per_split): 1625 result[i].append(next(iterator)) 1626 if remainder: 1627 result[i].append(next(iterator)) 1628 remainder -= 1 1629 for i, r in enumerate(result): 1630 header = "[filename%s]" % i 1631 disks = "\n".join(["filename=%s" % x for x in r]) 1632 job_section_qd = round((io_depth * len(r)) / num_jobs) 1633 if job_section_qd == 0: 1634 job_section_qd = 1 1635 iodepth = "iodepth=%s" % job_section_qd 1636 1637 offset_section = "" 1638 if offset: 1639 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1640 1641 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1642 1643 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1644 1645 return filename_section 1646 1647 def get_nvme_subsystem_numa(self, bdev_name): 1648 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1649 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1650 1651 # Remove two last characters to get controller name instead of subsystem name 1652 nvme_ctrl = bdev_name[:-2] 1653 for bdev in bdev_conf_json_obj: 1654 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1655 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1656 return None 1657 1658 1659if __name__ == "__main__": 1660 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1661 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1662 1663 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1664 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1665 help='Configuration file.') 1666 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1667 help='Results directory.') 1668 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1669 help='CSV results filename.') 1670 parser.add_argument('-f', '--force', default=False, action='store_true', 1671 dest='force', help="""Force script to continue and try to use all 1672 available NVMe devices during test. 1673 WARNING: Might result in data loss on used NVMe drives""") 1674 1675 args = parser.parse_args() 1676 1677 logging.basicConfig(level=logging.INFO, 1678 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1679 1680 logging.info("Using config file: %s" % args.config) 1681 with open(args.config, "r") as config: 1682 data = json.load(config) 1683 1684 initiators = [] 1685 fio_cases = [] 1686 1687 general_config = data["general"] 1688 target_config = data["target"] 1689 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1690 1691 if "null_block_devices" not in data["target"] and \ 1692 (args.force is False and 1693 "allowlist" not in data["target"] and 1694 "blocklist" not in data["target"]): 1695 # TODO: Also check if allowlist or blocklist are not empty. 1696 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1697 You can choose to use all available NVMe drives on your system, which may potentially 1698 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1699 exit(1) 1700 1701 for k, v in data.items(): 1702 if "target" in k: 1703 v.update({"results_dir": args.results}) 1704 if data[k]["mode"] == "spdk": 1705 target_obj = SPDKTarget(k, data["general"], v) 1706 elif data[k]["mode"] == "kernel": 1707 target_obj = KernelTarget(k, data["general"], v) 1708 elif "initiator" in k: 1709 if data[k]["mode"] == "spdk": 1710 init_obj = SPDKInitiator(k, data["general"], v) 1711 elif data[k]["mode"] == "kernel": 1712 init_obj = KernelInitiator(k, data["general"], v) 1713 initiators.append(init_obj) 1714 elif "fio" in k: 1715 fio_workloads = itertools.product(data[k]["bs"], 1716 data[k]["qd"], 1717 data[k]["rw"]) 1718 1719 fio_run_time = data[k]["run_time"] 1720 fio_ramp_time = data[k]["ramp_time"] 1721 fio_rw_mix_read = data[k]["rwmixread"] 1722 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1723 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1724 1725 fio_rate_iops = 0 1726 if "rate_iops" in data[k]: 1727 fio_rate_iops = data[k]["rate_iops"] 1728 1729 fio_offset = False 1730 if "offset" in data[k]: 1731 fio_offset = data[k]["offset"] 1732 fio_offset_inc = 0 1733 if "offset_inc" in data[k]: 1734 fio_offset_inc = data[k]["offset_inc"] 1735 else: 1736 continue 1737 1738 try: 1739 os.mkdir(args.results) 1740 except FileExistsError: 1741 pass 1742 1743 for i in initiators: 1744 target_obj.initiator_info.append( 1745 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1746 ) 1747 1748 # TODO: This try block is definietly too large. Need to break this up into separate 1749 # logical blocks to reduce size. 1750 try: 1751 target_obj.tgt_start() 1752 1753 for i in initiators: 1754 i.match_subsystems(target_obj.subsystem_info_list) 1755 if i.enable_adq: 1756 i.adq_configure_tc() 1757 1758 # Poor mans threading 1759 # Run FIO tests 1760 for block_size, io_depth, rw in fio_workloads: 1761 configs = [] 1762 for i in initiators: 1763 i.init_connect() 1764 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1765 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1766 fio_offset, fio_offset_inc) 1767 configs.append(cfg) 1768 1769 for run_no in range(1, fio_run_num+1): 1770 threads = [] 1771 power_daemon = None 1772 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1773 1774 for i, cfg in zip(initiators, configs): 1775 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1776 threads.append(t) 1777 if target_obj.enable_sar: 1778 sar_file_prefix = measurements_prefix + "_sar" 1779 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1780 threads.append(t) 1781 1782 if target_obj.enable_pcm: 1783 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]] 1784 1785 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1786 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1787 pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, 1788 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time)) 1789 pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, 1790 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time)) 1791 1792 threads.append(pcm_cpu_t) 1793 threads.append(pcm_mem_t) 1794 threads.append(pcm_pow_t) 1795 1796 if target_obj.enable_bw: 1797 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1798 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1799 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1800 threads.append(t) 1801 1802 if target_obj.enable_dpdk_memory: 1803 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1804 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1805 threads.append(t) 1806 1807 if target_obj.enable_pm: 1808 power_daemon = threading.Thread(target=target_obj.measure_power, 1809 args=(args.results, measurements_prefix, script_full_dir, 1810 fio_ramp_time, fio_run_time)) 1811 threads.append(power_daemon) 1812 1813 for t in threads: 1814 t.start() 1815 for t in threads: 1816 t.join() 1817 1818 for i in initiators: 1819 i.init_disconnect() 1820 i.copy_result_files(args.results) 1821 try: 1822 parse_results(args.results, args.csv_filename) 1823 except Exception as err: 1824 logging.error("There was an error with parsing the results") 1825 logging.error(err) 1826 finally: 1827 for i in initiators: 1828 try: 1829 i.stop() 1830 except Exception as err: 1831 pass 1832 target_obj.stop() 1833