1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False) 57 ] 58 self.read_config(config_fields, general_config) 59 self.transport = self.transport.lower() 60 61 config_fields = [ 62 ConfigField(name='nic_ips', required=True), 63 ConfigField(name='mode', required=True), 64 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 65 ConfigField(name='enable_arfs', default=False), 66 ConfigField(name='tuned_profile', default='') 67 ] 68 self.read_config(config_fields, server_config) 69 70 self.local_nic_info = [] 71 self._nics_json_obj = {} 72 self.svc_restore_dict = {} 73 self.sysctl_restore_dict = {} 74 self.tuned_restore_dict = {} 75 self.governor_restore = "" 76 77 self.enable_adq = server_config.get('adq_enable', False) 78 self.adq_priority = 1 if self.enable_adq else None 79 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 80 81 if not re.match(r'^[A-Za-z0-9\-]+$', name): 82 self.log.info("Please use a name which contains only letters, numbers or dashes") 83 sys.exit(1) 84 85 def read_config(self, config_fields, config): 86 for config_field in config_fields: 87 value = config.get(config_field.name, config_field.default) 88 if config_field.required is True and value is None: 89 raise ValueError('%s config key is required' % config_field.name) 90 91 setattr(self, config_field.name, value) 92 93 @staticmethod 94 def get_uncommented_lines(lines): 95 return [line for line in lines if line and not line.startswith('#')] 96 97 def get_nic_name_by_ip(self, ip): 98 if not self._nics_json_obj: 99 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 100 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 101 for nic in self._nics_json_obj: 102 for addr in nic["addr_info"]: 103 if ip in addr["local"]: 104 return nic["ifname"] 105 106 @abstractmethod 107 def set_local_nic_info_helper(self): 108 pass 109 110 def set_local_nic_info(self, pci_info): 111 def extract_network_elements(json_obj): 112 nic_list = [] 113 if isinstance(json_obj, list): 114 for x in json_obj: 115 nic_list.extend(extract_network_elements(x)) 116 elif isinstance(json_obj, dict): 117 if "children" in json_obj: 118 nic_list.extend(extract_network_elements(json_obj["children"])) 119 if "class" in json_obj.keys() and "network" in json_obj["class"]: 120 nic_list.append(json_obj) 121 return nic_list 122 123 self.local_nic_info = extract_network_elements(pci_info) 124 125 def get_nic_numa_node(self, nic_name): 126 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 127 128 def get_numa_cpu_map(self): 129 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 130 numa_cpu_json_map = {} 131 132 for cpu in numa_cpu_json_obj["cpus"]: 133 cpu_num = int(cpu["cpu"]) 134 numa_node = int(cpu["node"]) 135 numa_cpu_json_map.setdefault(numa_node, []) 136 numa_cpu_json_map[numa_node].append(cpu_num) 137 138 return numa_cpu_json_map 139 140 # pylint: disable=R0201 141 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 142 return "" 143 144 def configure_system(self): 145 self.load_drivers() 146 self.configure_services() 147 self.configure_sysctl() 148 self.configure_arfs() 149 self.configure_tuned() 150 self.configure_cpu_governor() 151 self.configure_irq_affinity(**self.irq_settings) 152 153 RDMA_PROTOCOL_IWARP = 0 154 RDMA_PROTOCOL_ROCE = 1 155 RDMA_PROTOCOL_UNKNOWN = -1 156 157 def check_rdma_protocol(self): 158 try: 159 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 160 roce_ena = roce_ena.strip() 161 if roce_ena == "0": 162 return self.RDMA_PROTOCOL_IWARP 163 else: 164 return self.RDMA_PROTOCOL_ROCE 165 except CalledProcessError as e: 166 self.log.error("ERROR: failed to check RDMA protocol!") 167 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 168 return self.RDMA_PROTOCOL_UNKNOWN 169 170 def load_drivers(self): 171 self.log.info("Loading drivers") 172 self.exec_cmd(["sudo", "modprobe", "-a", 173 "nvme-%s" % self.transport, 174 "nvmet-%s" % self.transport]) 175 current_mode = self.check_rdma_protocol() 176 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 177 self.log.error("ERROR: failed to check RDMA protocol mode") 178 return 179 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 180 self.reload_driver("irdma", "roce_ena=1") 181 self.log.info("Loaded irdma driver with RoCE enabled") 182 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 183 self.log.info("Leaving irdma driver with RoCE enabled") 184 else: 185 self.reload_driver("irdma", "roce_ena=0") 186 self.log.info("Loaded irdma driver with iWARP enabled") 187 188 def configure_arfs(self): 189 rps_flow_cnt = 512 190 if not self.enable_arfs: 191 rps_flow_cnt = 0 192 193 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 194 for nic_name in nic_names: 195 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 196 self.log.info(f"Setting rps_flow_cnt for {nic_name}") 197 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 198 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 199 200 for qf in queue_files: 201 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 202 203 def configure_adq(self): 204 self.adq_load_modules() 205 self.adq_configure_nic() 206 207 def adq_load_modules(self): 208 self.log.info("Modprobing ADQ-related Linux modules...") 209 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 210 for module in adq_module_deps: 211 try: 212 self.exec_cmd(["sudo", "modprobe", module]) 213 self.log.info("%s loaded!" % module) 214 except CalledProcessError as e: 215 self.log.error("ERROR: failed to load module %s" % module) 216 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 217 218 def adq_configure_tc(self): 219 self.log.info("Configuring ADQ Traffic classes and filters...") 220 221 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 222 num_queues_tc1 = self.num_cores 223 port_param = "dst_port" if isinstance(self, Target) else "src_port" 224 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 225 226 for nic_ip in self.nic_ips: 227 nic_name = self.get_nic_name_by_ip(nic_ip) 228 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 229 230 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 231 "root", "mqprio", "num_tc", "2", "map", "0", "1", 232 "queues", "%s@0" % num_queues_tc0, 233 "%s@%s" % (num_queues_tc1, num_queues_tc0), 234 "hw", "1", "mode", "channel"] 235 self.log.info(" ".join(tc_qdisc_map_cmd)) 236 self.exec_cmd(tc_qdisc_map_cmd) 237 238 time.sleep(5) 239 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 240 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 241 self.exec_cmd(tc_qdisc_ingress_cmd) 242 243 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 244 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 245 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 246 247 for port in nic_ports: 248 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 249 "protocol", "ip", "ingress", "prio", "1", "flower", 250 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 251 "skip_sw", "hw_tc", "1"] 252 self.log.info(" ".join(tc_filter_cmd)) 253 self.exec_cmd(tc_filter_cmd) 254 255 # show tc configuration 256 self.log.info("Show tc configuration for %s NIC..." % nic_name) 257 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 258 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 259 self.log.info("%s" % tc_disk_out) 260 self.log.info("%s" % tc_filter_out) 261 262 # Ethtool coalesce settings must be applied after configuring traffic classes 263 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 264 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 265 266 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 267 xps_cmd = ["sudo", xps_script_path, nic_name] 268 self.log.info(xps_cmd) 269 self.exec_cmd(xps_cmd) 270 271 def reload_driver(self, driver, *modprobe_args): 272 273 try: 274 self.exec_cmd(["sudo", "rmmod", driver]) 275 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 276 except CalledProcessError as e: 277 self.log.error("ERROR: failed to reload %s module!" % driver) 278 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 279 280 def adq_configure_nic(self): 281 self.log.info("Configuring NIC port settings for ADQ testing...") 282 283 # Reload the driver first, to make sure any previous settings are re-set. 284 self.reload_driver("ice") 285 286 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 287 for nic in nic_names: 288 self.log.info(nic) 289 try: 290 self.exec_cmd(["sudo", "ethtool", "-K", nic, 291 "hw-tc-offload", "on"]) # Enable hardware TC offload 292 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 293 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 294 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 295 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 296 # but can be used with 4k block sizes as well 297 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 298 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 299 except subprocess.CalledProcessError as e: 300 self.log.error("ERROR: failed to configure NIC port using ethtool!") 301 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 302 self.log.info("Please update your NIC driver and firmware versions and try again.") 303 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 304 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 305 306 def configure_services(self): 307 self.log.info("Configuring active services...") 308 svc_config = configparser.ConfigParser(strict=False) 309 310 # Below list is valid only for RHEL / Fedora systems and might not 311 # contain valid names for other distributions. 312 svc_target_state = { 313 "firewalld": "inactive", 314 "irqbalance": "inactive", 315 "lldpad.service": "inactive", 316 "lldpad.socket": "inactive" 317 } 318 319 for service in svc_target_state: 320 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 321 out = "\n".join(["[%s]" % service, out]) 322 svc_config.read_string(out) 323 324 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 325 continue 326 327 service_state = svc_config[service]["ActiveState"] 328 self.log.info("Current state of %s service is %s" % (service, service_state)) 329 self.svc_restore_dict.update({service: service_state}) 330 if service_state != "inactive": 331 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 332 self.exec_cmd(["sudo", "systemctl", "stop", service]) 333 334 def configure_sysctl(self): 335 self.log.info("Tuning sysctl settings...") 336 337 busy_read = 0 338 busy_poll = 0 339 if self.enable_adq and self.mode == "spdk": 340 busy_read = 1 341 busy_poll = 1 342 343 sysctl_opts = { 344 "net.core.busy_poll": busy_poll, 345 "net.core.busy_read": busy_read, 346 "net.core.somaxconn": 4096, 347 "net.core.netdev_max_backlog": 8192, 348 "net.ipv4.tcp_max_syn_backlog": 16384, 349 "net.core.rmem_max": 268435456, 350 "net.core.wmem_max": 268435456, 351 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 352 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 353 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 354 "net.ipv4.route.flush": 1, 355 "vm.overcommit_memory": 1, 356 "net.core.rps_sock_flow_entries": 0 357 } 358 359 if self.enable_adq: 360 sysctl_opts.update(self.adq_set_busy_read(1)) 361 362 if self.enable_arfs: 363 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 364 365 for opt, value in sysctl_opts.items(): 366 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 367 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 368 369 def configure_tuned(self): 370 if not self.tuned_profile: 371 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 372 return 373 374 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 375 service = "tuned" 376 tuned_config = configparser.ConfigParser(strict=False) 377 378 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 379 out = "\n".join(["[%s]" % service, out]) 380 tuned_config.read_string(out) 381 tuned_state = tuned_config[service]["ActiveState"] 382 self.svc_restore_dict.update({service: tuned_state}) 383 384 if tuned_state != "inactive": 385 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 386 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 387 388 self.tuned_restore_dict = { 389 "profile": profile, 390 "mode": profile_mode 391 } 392 393 self.exec_cmd(["sudo", "systemctl", "start", service]) 394 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 395 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 396 397 def configure_cpu_governor(self): 398 self.log.info("Setting CPU governor to performance...") 399 400 # This assumes that there is the same CPU scaling governor on each CPU 401 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 402 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 403 404 def get_core_list_from_mask(self, core_mask): 405 # Generate list of individual cores from hex core mask 406 # (e.g. '0xffff') or list containing: 407 # - individual cores (e.g. '1, 2, 3') 408 # - core ranges (e.g. '0-3') 409 # - mix of both (e.g. '0, 1-4, 9, 11-13') 410 411 core_list = [] 412 if "0x" in core_mask: 413 core_mask_int = int(core_mask, 16) 414 for i in range(core_mask_int.bit_length()): 415 if (1 << i) & core_mask_int: 416 core_list.append(i) 417 return core_list 418 else: 419 # Core list can be provided in .json config with square brackets 420 # remove them first 421 core_mask = core_mask.replace("[", "") 422 core_mask = core_mask.replace("]", "") 423 424 for i in core_mask.split(","): 425 if "-" in i: 426 start, end = i.split("-") 427 core_range = range(int(start), int(end) + 1) 428 core_list.extend(core_range) 429 else: 430 core_list.append(int(i)) 431 return core_list 432 433 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 434 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 435 436 if mode not in ["default", "bynode", "cpulist"]: 437 raise ValueError("%s irq affinity setting not supported" % mode) 438 439 if mode == "cpulist" and not cpulist: 440 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 441 442 affinity_script = "set_irq_affinity.sh" 443 if "default" not in mode: 444 affinity_script = "set_irq_affinity_cpulist.sh" 445 system_cpu_map = self.get_numa_cpu_map() 446 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 447 448 def cpu_list_to_string(cpulist): 449 return ",".join(map(lambda x: str(x), cpulist)) 450 451 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 452 for nic_name in nic_names: 453 irq_cmd = ["sudo", irq_script_path] 454 455 # Use only CPU cores matching NIC NUMA node. 456 # Remove any CPU cores if they're on exclusion list. 457 if mode == "bynode": 458 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 459 if cpulist and exclude_cpulist: 460 disallowed_cpus = self.get_core_list_from_mask(cpulist) 461 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 462 if not irq_cpus: 463 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 464 irq_cmd.append(cpu_list_to_string(irq_cpus)) 465 466 if mode == "cpulist": 467 irq_cpus = self.get_core_list_from_mask(cpulist) 468 if exclude_cpulist: 469 # Flatten system CPU list, we don't need NUMA awareness here 470 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 471 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 472 if not irq_cpus: 473 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 474 irq_cmd.append(cpu_list_to_string(irq_cpus)) 475 476 irq_cmd.append(nic_name) 477 self.log.info(irq_cmd) 478 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 479 480 def restore_services(self): 481 self.log.info("Restoring services...") 482 for service, state in self.svc_restore_dict.items(): 483 cmd = "stop" if state == "inactive" else "start" 484 self.exec_cmd(["sudo", "systemctl", cmd, service]) 485 486 def restore_sysctl(self): 487 self.log.info("Restoring sysctl settings...") 488 for opt, value in self.sysctl_restore_dict.items(): 489 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 490 491 def restore_tuned(self): 492 self.log.info("Restoring tuned-adm settings...") 493 494 if not self.tuned_restore_dict: 495 return 496 497 if self.tuned_restore_dict["mode"] == "auto": 498 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 499 self.log.info("Reverted tuned-adm to auto_profile.") 500 else: 501 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 502 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 503 504 def restore_governor(self): 505 self.log.info("Restoring CPU governor setting...") 506 if self.governor_restore: 507 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 508 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 509 510 def restore_settings(self): 511 self.restore_governor() 512 self.restore_tuned() 513 self.restore_services() 514 self.restore_sysctl() 515 if self.enable_adq: 516 self.reload_driver("ice") 517 518 519class Target(Server): 520 def __init__(self, name, general_config, target_config): 521 super().__init__(name, general_config, target_config) 522 523 # Defaults 524 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 525 self.subsystem_info_list = [] 526 self.initiator_info = [] 527 528 config_fields = [ 529 ConfigField(name='mode', required=True), 530 ConfigField(name='results_dir', required=True), 531 ConfigField(name='enable_pm', default=True), 532 ConfigField(name='enable_sar', default=True), 533 ConfigField(name='enable_pcm', default=True), 534 ConfigField(name='enable_dpdk_memory', default=True) 535 ] 536 self.read_config(config_fields, target_config) 537 538 self.null_block = target_config.get('null_block_devices', 0) 539 self.scheduler_name = target_config.get('scheduler_settings', 'static') 540 self.enable_zcopy = target_config.get('zcopy_settings', False) 541 self.enable_bw = target_config.get('enable_bandwidth', True) 542 self.nvme_blocklist = target_config.get('blocklist', []) 543 self.nvme_allowlist = target_config.get('allowlist', []) 544 545 # Blocklist takes precedence, remove common elements from allowlist 546 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 547 548 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 549 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 550 551 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 552 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 553 self.set_local_nic_info(self.set_local_nic_info_helper()) 554 555 if self.skip_spdk_install is False: 556 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 557 558 self.configure_system() 559 if self.enable_adq: 560 self.configure_adq() 561 self.configure_irq_affinity() 562 self.sys_config() 563 564 def set_local_nic_info_helper(self): 565 return json.loads(self.exec_cmd(["lshw", "-json"])) 566 567 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 568 stderr_opt = None 569 if stderr_redirect: 570 stderr_opt = subprocess.STDOUT 571 if change_dir: 572 old_cwd = os.getcwd() 573 os.chdir(change_dir) 574 self.log.info("Changing directory to %s" % change_dir) 575 576 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 577 578 if change_dir: 579 os.chdir(old_cwd) 580 self.log.info("Changing directory to %s" % old_cwd) 581 return out 582 583 def zip_spdk_sources(self, spdk_dir, dest_file): 584 self.log.info("Zipping SPDK source directory") 585 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 586 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 587 for file in files: 588 fh.write(os.path.relpath(os.path.join(root, file))) 589 fh.close() 590 self.log.info("Done zipping") 591 592 @staticmethod 593 def _chunks(input_list, chunks_no): 594 div, rem = divmod(len(input_list), chunks_no) 595 for i in range(chunks_no): 596 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 597 yield input_list[si:si + (div + 1 if i < rem else div)] 598 599 def spread_bdevs(self, req_disks): 600 # Spread available block devices indexes: 601 # - evenly across available initiator systems 602 # - evenly across available NIC interfaces for 603 # each initiator 604 # Not NUMA aware. 605 ip_bdev_map = [] 606 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 607 608 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 609 self.initiator_info[i]["bdev_range"] = init_chunk 610 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 611 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 612 for c in nic_chunk: 613 ip_bdev_map.append((ip, c)) 614 return ip_bdev_map 615 616 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 617 cpu_number = os.cpu_count() 618 sar_idle_sum = 0 619 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 620 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 621 622 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 623 time.sleep(ramp_time) 624 self.log.info("Starting SAR measurements") 625 626 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 627 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 628 for line in out.split("\n"): 629 if "Average" in line: 630 if "CPU" in line: 631 self.log.info("Summary CPU utilization from SAR:") 632 self.log.info(line) 633 elif "all" in line: 634 self.log.info(line) 635 else: 636 sar_idle_sum += float(line.split()[7]) 637 fh.write(out) 638 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 639 640 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 641 f.write("%0.2f" % sar_cpu_usage) 642 643 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 644 time.sleep(ramp_time) 645 self.log.info("Starting power measurements") 646 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 647 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 648 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 649 650 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 651 time.sleep(ramp_time) 652 cmd = ["pcm", "1", "-i=%s" % run_time, 653 "-csv=%s/%s" % (results_dir, pcm_file_name)] 654 subprocess.run(cmd) 655 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 656 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 657 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 658 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 659 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 660 661 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 662 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 663 time.sleep(ramp_time) 664 self.log.info("INFO: starting network bandwidth measure") 665 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 666 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 667 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 668 # Improve this so that additional file containing measurements average is generated too. 669 # TODO: Monitor only these interfaces which are currently used to run the workload. 670 671 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 672 self.log.info("INFO: waiting to generate DPDK memory usage") 673 time.sleep(ramp_time) 674 self.log.info("INFO: generating DPDK memory usage") 675 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 676 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 677 678 def sys_config(self): 679 self.log.info("====Kernel release:====") 680 self.log.info(os.uname().release) 681 self.log.info("====Kernel command line:====") 682 with open('/proc/cmdline') as f: 683 cmdline = f.readlines() 684 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 685 self.log.info("====sysctl conf:====") 686 with open('/etc/sysctl.conf') as f: 687 sysctl = f.readlines() 688 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 689 self.log.info("====Cpu power info:====") 690 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 691 self.log.info("====zcopy settings:====") 692 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 693 self.log.info("====Scheduler settings:====") 694 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 695 696 697class Initiator(Server): 698 def __init__(self, name, general_config, initiator_config): 699 super().__init__(name, general_config, initiator_config) 700 701 # required fields and defaults 702 config_fields = [ 703 ConfigField(name='mode', required=True), 704 ConfigField(name='ip', required=True), 705 ConfigField(name='target_nic_ips', required=True), 706 ConfigField(name='cpus_allowed', default=None), 707 ConfigField(name='cpus_allowed_policy', default='shared'), 708 ConfigField(name='spdk_dir', default='/tmp/spdk'), 709 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 710 ConfigField(name='nvmecli_bin', default='nvme'), 711 ConfigField(name='cpu_frequency', default=None), 712 ConfigField(name='allow_cpu_sharing', default=True) 713 ] 714 715 self.read_config(config_fields, initiator_config) 716 717 if os.getenv('SPDK_WORKSPACE'): 718 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 719 720 self.subsystem_info_list = [] 721 722 self.ssh_connection = paramiko.SSHClient() 723 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 724 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 725 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 726 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 727 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 728 729 if self.skip_spdk_install is False: 730 self.copy_spdk("/tmp/spdk.zip") 731 732 self.set_local_nic_info(self.set_local_nic_info_helper()) 733 self.set_cpu_frequency() 734 self.configure_system() 735 if self.enable_adq: 736 self.configure_adq() 737 self.sys_config() 738 739 def set_local_nic_info_helper(self): 740 return json.loads(self.exec_cmd(["lshw", "-json"])) 741 742 def stop(self): 743 self.restore_settings() 744 self.ssh_connection.close() 745 746 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 747 if change_dir: 748 cmd = ["cd", change_dir, ";", *cmd] 749 750 # In case one of the command elements contains whitespace and is not 751 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 752 # when sending to remote system. 753 for i, c in enumerate(cmd): 754 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 755 cmd[i] = '"%s"' % c 756 cmd = " ".join(cmd) 757 758 # Redirect stderr to stdout thanks using get_pty option if needed 759 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 760 out = stdout.read().decode(encoding="utf-8") 761 762 # Check the return code 763 rc = stdout.channel.recv_exit_status() 764 if rc: 765 raise CalledProcessError(int(rc), cmd, out) 766 767 return out 768 769 def put_file(self, local, remote_dest): 770 ftp = self.ssh_connection.open_sftp() 771 ftp.put(local, remote_dest) 772 ftp.close() 773 774 def get_file(self, remote, local_dest): 775 ftp = self.ssh_connection.open_sftp() 776 ftp.get(remote, local_dest) 777 ftp.close() 778 779 def copy_spdk(self, local_spdk_zip): 780 self.log.info("Copying SPDK sources to initiator %s" % self.name) 781 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 782 self.log.info("Copied sources zip from target") 783 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 784 self.log.info("Sources unpacked") 785 786 def copy_result_files(self, dest_dir): 787 self.log.info("Copying results") 788 789 if not os.path.exists(dest_dir): 790 os.mkdir(dest_dir) 791 792 # Get list of result files from initiator and copy them back to target 793 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 794 795 for file in file_list: 796 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 797 os.path.join(dest_dir, file)) 798 self.log.info("Done copying results") 799 800 def match_subsystems(self, target_subsytems): 801 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 802 subsystems.sort(key=lambda x: x[1]) 803 self.log.info("Found matching subsystems on target side:") 804 for s in subsystems: 805 self.log.info(s) 806 self.subsystem_info_list = subsystems 807 808 @abstractmethod 809 def init_connect(self): 810 pass 811 812 @abstractmethod 813 def init_disconnect(self): 814 pass 815 816 @abstractmethod 817 def gen_fio_filename_conf(self, *args, **kwargs): 818 # Logic implemented in SPDKInitiator and KernelInitiator classes 819 pass 820 821 def get_route_nic_numa(self, remote_nvme_ip): 822 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 823 local_nvme_nic = local_nvme_nic[0]["dev"] 824 return self.get_nic_numa_node(local_nvme_nic) 825 826 @staticmethod 827 def gen_fio_offset_section(offset_inc, num_jobs): 828 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 829 return "\n".join(["size=%s%%" % offset_inc, 830 "offset=0%", 831 "offset_increment=%s%%" % offset_inc]) 832 833 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 834 numa_stats = {} 835 allowed_cpus = [] 836 for nvme in fio_filenames_list: 837 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 838 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 839 840 # Use the most common NUMA node for this chunk to allocate memory and CPUs 841 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 842 843 # Check if we have enough free CPUs to pop from the list before assigning them 844 if len(self.available_cpus[section_local_numa]) < num_jobs: 845 if self.allow_cpu_sharing: 846 self.log.info("Regenerating available CPU list %s" % section_local_numa) 847 # Remove still available CPUs from the regenerated list. We don't want to 848 # regenerate it with duplicates. 849 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 850 self.available_cpus[section_local_numa].extend(cpus_regen) 851 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 852 else: 853 self.log.error("No more free CPU cores to use from allowed_cpus list!") 854 raise IndexError 855 856 for _ in range(num_jobs): 857 try: 858 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 859 except IndexError: 860 self.log.error("No more free CPU cores to use from allowed_cpus list!") 861 raise 862 863 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 864 "numa_mem_policy=prefer:%s" % section_local_numa]) 865 866 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 867 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 868 offset=False, offset_inc=0): 869 fio_conf_template = """ 870[global] 871ioengine={ioengine} 872{spdk_conf} 873thread=1 874group_reporting=1 875direct=1 876percentile_list=50:90:99:99.5:99.9:99.99:99.999 877 878norandommap=1 879rw={rw} 880rwmixread={rwmixread} 881bs={block_size} 882time_based=1 883ramp_time={ramp_time} 884runtime={run_time} 885rate_iops={rate_iops} 886""" 887 888 if self.cpus_allowed is not None: 889 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 890 cpus_num = 0 891 cpus = self.cpus_allowed.split(",") 892 for cpu in cpus: 893 if "-" in cpu: 894 a, b = cpu.split("-") 895 a = int(a) 896 b = int(b) 897 cpus_num += len(range(a, b)) 898 else: 899 cpus_num += 1 900 self.num_cores = cpus_num 901 threads = range(0, self.num_cores) 902 elif hasattr(self, 'num_cores'): 903 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 904 threads = range(0, int(self.num_cores)) 905 else: 906 self.num_cores = len(self.subsystem_info_list) 907 threads = range(0, len(self.subsystem_info_list)) 908 909 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 910 offset, offset_inc) 911 912 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 913 rw=rw, rwmixread=rwmixread, block_size=block_size, 914 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 915 916 # TODO: hipri disabled for now, as it causes fio errors: 917 # io_u error on file /dev/nvme2n1: Operation not supported 918 # See comment in KernelInitiator class, init_connect() function 919 if "io_uring" in self.ioengine: 920 fio_config = fio_config + """ 921fixedbufs=1 922registerfiles=1 923#hipri=1 924""" 925 if num_jobs: 926 fio_config = fio_config + "numjobs=%s \n" % num_jobs 927 if self.cpus_allowed is not None: 928 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 929 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 930 fio_config = fio_config + filename_section 931 932 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 933 if hasattr(self, "num_cores"): 934 fio_config_filename += "_%sCPU" % self.num_cores 935 fio_config_filename += ".fio" 936 937 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 938 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 939 self.log.info("Created FIO Config:") 940 self.log.info(fio_config) 941 942 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 943 944 def set_cpu_frequency(self): 945 if self.cpu_frequency is not None: 946 try: 947 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 948 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 949 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 950 except Exception: 951 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 952 sys.exit(1) 953 else: 954 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 955 956 def run_fio(self, fio_config_file, run_num=1): 957 job_name, _ = os.path.splitext(fio_config_file) 958 self.log.info("Starting FIO run for job: %s" % job_name) 959 self.log.info("Using FIO: %s" % self.fio_bin) 960 961 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 962 try: 963 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 964 "--output=%s" % output_filename, "--eta=never"], True) 965 self.log.info(output) 966 self.log.info("FIO run finished. Results in: %s" % output_filename) 967 except subprocess.CalledProcessError as e: 968 self.log.error("ERROR: Fio process failed!") 969 self.log.error(e.stdout) 970 971 def sys_config(self): 972 self.log.info("====Kernel release:====") 973 self.log.info(self.exec_cmd(["uname", "-r"])) 974 self.log.info("====Kernel command line:====") 975 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 976 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 977 self.log.info("====sysctl conf:====") 978 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 979 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 980 self.log.info("====Cpu power info:====") 981 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 982 983 984class KernelTarget(Target): 985 def __init__(self, name, general_config, target_config): 986 super().__init__(name, general_config, target_config) 987 # Defaults 988 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 989 990 def load_drivers(self): 991 self.log.info("Loading drivers") 992 super().load_drivers() 993 if self.null_block: 994 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 995 996 def configure_adq(self): 997 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 998 999 def adq_configure_tc(self): 1000 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1001 1002 def adq_set_busy_read(self, busy_read_val): 1003 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1004 return {"net.core.busy_read": 0} 1005 1006 def stop(self): 1007 self.nvmet_command(self.nvmet_bin, "clear") 1008 self.restore_settings() 1009 1010 def get_nvme_device_bdf(self, nvme_dev_path): 1011 nvme_name = os.path.basename(nvme_dev_path) 1012 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1013 1014 def get_nvme_devices(self): 1015 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1016 nvme_list = [] 1017 for dev in dev_list: 1018 if "nvme" not in dev: 1019 continue 1020 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1021 continue 1022 if len(self.nvme_allowlist) == 0: 1023 nvme_list.append(dev) 1024 continue 1025 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1026 nvme_list.append(dev) 1027 return nvme_list 1028 1029 def nvmet_command(self, nvmet_bin, command): 1030 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1031 1032 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1033 1034 nvmet_cfg = { 1035 "ports": [], 1036 "hosts": [], 1037 "subsystems": [], 1038 } 1039 1040 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1041 port = str(4420 + bdev_num) 1042 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1043 serial = "SPDK00%s" % bdev_num 1044 bdev_name = nvme_list[bdev_num] 1045 1046 nvmet_cfg["subsystems"].append({ 1047 "allowed_hosts": [], 1048 "attr": { 1049 "allow_any_host": "1", 1050 "serial": serial, 1051 "version": "1.3" 1052 }, 1053 "namespaces": [ 1054 { 1055 "device": { 1056 "path": bdev_name, 1057 "uuid": "%s" % uuid.uuid4() 1058 }, 1059 "enable": 1, 1060 "nsid": port 1061 } 1062 ], 1063 "nqn": nqn 1064 }) 1065 1066 nvmet_cfg["ports"].append({ 1067 "addr": { 1068 "adrfam": "ipv4", 1069 "traddr": ip, 1070 "trsvcid": port, 1071 "trtype": self.transport 1072 }, 1073 "portid": bdev_num, 1074 "referrals": [], 1075 "subsystems": [nqn] 1076 }) 1077 1078 self.subsystem_info_list.append((port, nqn, ip)) 1079 self.subsys_no = len(self.subsystem_info_list) 1080 1081 with open("kernel.conf", "w") as fh: 1082 fh.write(json.dumps(nvmet_cfg, indent=2)) 1083 1084 def tgt_start(self): 1085 self.log.info("Configuring kernel NVMeOF Target") 1086 1087 if self.null_block: 1088 self.log.info("Configuring with null block device.") 1089 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1090 else: 1091 self.log.info("Configuring with NVMe drives.") 1092 nvme_list = self.get_nvme_devices() 1093 1094 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1095 self.subsys_no = len(nvme_list) 1096 1097 self.nvmet_command(self.nvmet_bin, "clear") 1098 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1099 1100 if self.enable_adq: 1101 self.adq_configure_tc() 1102 1103 self.log.info("Done configuring kernel NVMeOF Target") 1104 1105 1106class SPDKTarget(Target): 1107 def __init__(self, name, general_config, target_config): 1108 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1109 # Method setting IRQ affinity is run as part of parent classes init, 1110 # so we need to have self.core_mask set before changing IRQ affinity. 1111 self.core_mask = target_config["core_mask"] 1112 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1113 1114 super().__init__(name, general_config, target_config) 1115 1116 # Format: property, default value 1117 config_fields = [ 1118 ConfigField(name='dif_insert_strip', default=False), 1119 ConfigField(name='null_block_dif_type', default=0), 1120 ConfigField(name='num_shared_buffers', default=4096), 1121 ConfigField(name='max_queue_depth', default=128), 1122 ConfigField(name='bpf_scripts', default=[]), 1123 ConfigField(name='scheduler_core_limit', default=None), 1124 ConfigField(name='dsa_settings', default=False), 1125 ConfigField(name='iobuf_small_pool_count', default=32767), 1126 ConfigField(name='iobuf_large_pool_count', default=16383), 1127 ConfigField(name='num_cqe', default=4096), 1128 ConfigField(name='sock_impl', default='posix') 1129 ] 1130 1131 self.read_config(config_fields, target_config) 1132 1133 self.bpf_proc = None 1134 self.enable_dsa = False 1135 1136 self.log.info("====DSA settings:====") 1137 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1138 1139 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1140 if mode not in ["default", "bynode", "cpulist", 1141 "shared", "split", "split-bynode"]: 1142 self.log.error("%s irq affinity setting not supported" % mode) 1143 raise Exception 1144 1145 # Create core list from SPDK's mask and change it to string. 1146 # This is the type configure_irq_affinity expects for cpulist parameter. 1147 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1148 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1149 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1150 1151 if mode == "shared": 1152 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1153 elif mode == "split": 1154 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1155 elif mode == "split-bynode": 1156 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1157 else: 1158 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1159 1160 def adq_set_busy_read(self, busy_read_val): 1161 return {"net.core.busy_read": busy_read_val} 1162 1163 def get_nvme_devices_count(self): 1164 return len(self.get_nvme_devices()) 1165 1166 def get_nvme_devices(self): 1167 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1168 bdev_bdfs = [] 1169 for bdev in bdev_subsys_json_obj["config"]: 1170 bdev_traddr = bdev["params"]["traddr"] 1171 if bdev_traddr in self.nvme_blocklist: 1172 continue 1173 if len(self.nvme_allowlist) == 0: 1174 bdev_bdfs.append(bdev_traddr) 1175 if bdev_traddr in self.nvme_allowlist: 1176 bdev_bdfs.append(bdev_traddr) 1177 return bdev_bdfs 1178 1179 def spdk_tgt_configure(self): 1180 self.log.info("Configuring SPDK NVMeOF target via RPC") 1181 1182 # Create transport layer 1183 nvmf_transport_params = { 1184 "client": self.client, 1185 "trtype": self.transport, 1186 "num_shared_buffers": self.num_shared_buffers, 1187 "max_queue_depth": self.max_queue_depth, 1188 "dif_insert_or_strip": self.dif_insert_strip, 1189 "sock_priority": self.adq_priority, 1190 "num_cqe": self.num_cqe 1191 } 1192 1193 if self.enable_adq: 1194 nvmf_transport_params["acceptor_poll_rate"] = 10000 1195 1196 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1197 self.log.info("SPDK NVMeOF transport layer:") 1198 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1199 1200 if self.null_block: 1201 self.spdk_tgt_add_nullblock(self.null_block) 1202 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1203 else: 1204 self.spdk_tgt_add_nvme_conf() 1205 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1206 1207 if self.enable_adq: 1208 self.adq_configure_tc() 1209 1210 self.log.info("Done configuring SPDK NVMeOF Target") 1211 1212 def spdk_tgt_add_nullblock(self, null_block_count): 1213 md_size = 0 1214 block_size = 4096 1215 if self.null_block_dif_type != 0: 1216 md_size = 128 1217 1218 self.log.info("Adding null block bdevices to config via RPC") 1219 for i in range(null_block_count): 1220 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1221 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1222 dif_type=self.null_block_dif_type, md_size=md_size) 1223 self.log.info("SPDK Bdevs configuration:") 1224 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1225 1226 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1227 self.log.info("Adding NVMe bdevs to config via RPC") 1228 1229 bdfs = self.get_nvme_devices() 1230 bdfs = [b.replace(":", ".") for b in bdfs] 1231 1232 if req_num_disks: 1233 if req_num_disks > len(bdfs): 1234 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1235 sys.exit(1) 1236 else: 1237 bdfs = bdfs[0:req_num_disks] 1238 1239 for i, bdf in enumerate(bdfs): 1240 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1241 1242 self.log.info("SPDK Bdevs configuration:") 1243 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1244 1245 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1246 self.log.info("Adding subsystems to config") 1247 if not req_num_disks: 1248 req_num_disks = self.get_nvme_devices_count() 1249 1250 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1251 port = str(4420 + bdev_num) 1252 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1253 serial = "SPDK00%s" % bdev_num 1254 bdev_name = "Nvme%sn1" % bdev_num 1255 1256 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1257 allow_any_host=True, max_namespaces=8) 1258 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1259 for nqn_name in [nqn, "discovery"]: 1260 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1261 nqn=nqn_name, 1262 trtype=self.transport, 1263 traddr=ip, 1264 trsvcid=port, 1265 adrfam="ipv4") 1266 self.subsystem_info_list.append((port, nqn, ip)) 1267 self.subsys_no = len(self.subsystem_info_list) 1268 1269 self.log.info("SPDK NVMeOF subsystem configuration:") 1270 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1271 1272 def bpf_start(self): 1273 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1274 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1275 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1276 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1277 1278 with open(self.pid, "r") as fh: 1279 nvmf_pid = str(fh.readline()) 1280 1281 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1282 self.log.info(cmd) 1283 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1284 1285 def tgt_start(self): 1286 if self.null_block: 1287 self.subsys_no = 1 1288 else: 1289 self.subsys_no = self.get_nvme_devices_count() 1290 self.log.info("Starting SPDK NVMeOF Target process") 1291 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1292 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1293 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1294 1295 with open(self.pid, "w") as fh: 1296 fh.write(str(proc.pid)) 1297 self.nvmf_proc = proc 1298 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1299 self.log.info("Waiting for spdk to initialize...") 1300 while True: 1301 if os.path.exists("/var/tmp/spdk.sock"): 1302 break 1303 time.sleep(1) 1304 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1305 1306 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1307 rpc.iobuf.iobuf_set_options(self.client, 1308 small_pool_count=self.iobuf_small_pool_count, 1309 large_pool_count=self.iobuf_large_pool_count, 1310 small_bufsize=None, 1311 large_bufsize=None) 1312 1313 if self.enable_zcopy: 1314 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1315 enable_zerocopy_send_server=True) 1316 self.log.info("Target socket options:") 1317 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1318 1319 if self.enable_adq: 1320 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1321 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1322 nvme_adminq_poll_period_us=100000, retry_count=4) 1323 1324 if self.enable_dsa: 1325 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1326 self.log.info("Target DSA accel module enabled") 1327 1328 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1329 rpc.framework_start_init(self.client) 1330 1331 if self.bpf_scripts: 1332 self.bpf_start() 1333 1334 self.spdk_tgt_configure() 1335 1336 def stop(self): 1337 if self.bpf_proc: 1338 self.log.info("Stopping BPF Trace script") 1339 self.bpf_proc.terminate() 1340 self.bpf_proc.wait() 1341 1342 if hasattr(self, "nvmf_proc"): 1343 try: 1344 self.nvmf_proc.terminate() 1345 self.nvmf_proc.wait(timeout=30) 1346 except Exception as e: 1347 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1348 self.log.info(e) 1349 self.nvmf_proc.kill() 1350 self.nvmf_proc.communicate() 1351 # Try to clean up RPC socket files if they were not removed 1352 # because of using 'kill' 1353 try: 1354 os.remove("/var/tmp/spdk.sock") 1355 os.remove("/var/tmp/spdk.sock.lock") 1356 except FileNotFoundError: 1357 pass 1358 self.restore_settings() 1359 1360 1361class KernelInitiator(Initiator): 1362 def __init__(self, name, general_config, initiator_config): 1363 super().__init__(name, general_config, initiator_config) 1364 1365 # Defaults 1366 self.extra_params = initiator_config.get('extra_params', '') 1367 1368 self.ioengine = "libaio" 1369 self.spdk_conf = "" 1370 1371 if "num_cores" in initiator_config: 1372 self.num_cores = initiator_config["num_cores"] 1373 1374 if "kernel_engine" in initiator_config: 1375 self.ioengine = initiator_config["kernel_engine"] 1376 if "io_uring" in self.ioengine: 1377 self.extra_params += ' --nr-poll-queues=8' 1378 1379 def configure_adq(self): 1380 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1381 1382 def adq_configure_tc(self): 1383 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1384 1385 def adq_set_busy_read(self, busy_read_val): 1386 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1387 return {"net.core.busy_read": 0} 1388 1389 def get_connected_nvme_list(self): 1390 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1391 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1392 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1393 return nvme_list 1394 1395 def init_connect(self): 1396 self.log.info("Below connection attempts may result in error messages, this is expected!") 1397 for subsystem in self.subsystem_info_list: 1398 self.log.info("Trying to connect %s %s %s" % subsystem) 1399 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1400 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1401 time.sleep(2) 1402 1403 if "io_uring" in self.ioengine: 1404 self.log.info("Setting block layer settings for io_uring.") 1405 1406 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1407 # apparently it's not possible for connected subsystems. 1408 # Results in "error: Invalid argument" 1409 block_sysfs_settings = { 1410 "iostats": "0", 1411 "rq_affinity": "0", 1412 "nomerges": "2" 1413 } 1414 1415 for disk in self.get_connected_nvme_list(): 1416 sysfs = os.path.join("/sys/block", disk, "queue") 1417 for k, v in block_sysfs_settings.items(): 1418 sysfs_opt_path = os.path.join(sysfs, k) 1419 try: 1420 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1421 except CalledProcessError as e: 1422 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1423 finally: 1424 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1425 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1426 1427 def init_disconnect(self): 1428 for subsystem in self.subsystem_info_list: 1429 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1430 time.sleep(1) 1431 1432 def get_nvme_subsystem_numa(self, dev_name): 1433 # Remove two last characters to get controller name instead of subsystem name 1434 nvme_ctrl = os.path.basename(dev_name)[:-2] 1435 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1436 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1437 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1438 1439 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1440 self.available_cpus = self.get_numa_cpu_map() 1441 if len(threads) >= len(subsystems): 1442 threads = range(0, len(subsystems)) 1443 1444 # Generate connected nvme devices names and sort them by used NIC numa node 1445 # to allow better grouping when splitting into fio sections. 1446 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1447 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1448 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1449 1450 filename_section = "" 1451 nvme_per_split = int(len(nvme_list) / len(threads)) 1452 remainder = len(nvme_list) % len(threads) 1453 iterator = iter(nvme_list) 1454 result = [] 1455 for i in range(len(threads)): 1456 result.append([]) 1457 for _ in range(nvme_per_split): 1458 result[i].append(next(iterator)) 1459 if remainder: 1460 result[i].append(next(iterator)) 1461 remainder -= 1 1462 for i, r in enumerate(result): 1463 header = "[filename%s]" % i 1464 disks = "\n".join(["filename=%s" % x for x in r]) 1465 job_section_qd = round((io_depth * len(r)) / num_jobs) 1466 if job_section_qd == 0: 1467 job_section_qd = 1 1468 iodepth = "iodepth=%s" % job_section_qd 1469 1470 offset_section = "" 1471 if offset: 1472 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1473 1474 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1475 1476 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1477 1478 return filename_section 1479 1480 1481class SPDKInitiator(Initiator): 1482 def __init__(self, name, general_config, initiator_config): 1483 super().__init__(name, general_config, initiator_config) 1484 1485 if self.skip_spdk_install is False: 1486 self.install_spdk() 1487 1488 # Optional fields 1489 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1490 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1491 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1492 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1493 1494 if "num_cores" in initiator_config: 1495 self.num_cores = initiator_config["num_cores"] 1496 1497 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1498 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1499 1500 def adq_set_busy_read(self, busy_read_val): 1501 return {"net.core.busy_read": busy_read_val} 1502 1503 def install_spdk(self): 1504 self.log.info("Using fio binary %s" % self.fio_bin) 1505 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1506 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1507 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1508 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1509 "--enable-lto", "--disable-unit-tests"]) 1510 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1511 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1512 1513 self.log.info("SPDK built") 1514 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1515 1516 def init_connect(self): 1517 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1518 # bdev plugin initiates connection just before starting IO traffic. 1519 # This is just to have a "init_connect" equivalent of the same function 1520 # from KernelInitiator class. 1521 # Just prepare bdev.conf JSON file for later use and consider it 1522 # "making a connection". 1523 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1524 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1525 1526 def init_disconnect(self): 1527 # SPDK Initiator does not need to explicity disconnect as this gets done 1528 # after fio bdev plugin finishes IO. 1529 return 1530 1531 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1532 spdk_cfg_section = { 1533 "subsystems": [ 1534 { 1535 "subsystem": "bdev", 1536 "config": [] 1537 }, 1538 { 1539 "subsystem": "iobuf", 1540 "config": [ 1541 { 1542 "method": "iobuf_set_options", 1543 "params": { 1544 "small_pool_count": self.small_pool_count, 1545 "large_pool_count": self.large_pool_count 1546 } 1547 } 1548 ] 1549 }, 1550 { 1551 "subsystem": "sock", 1552 "config": [ 1553 { 1554 "method": "sock_set_default_impl", 1555 "params": { 1556 "impl_name": self.sock_impl 1557 } 1558 } 1559 ] 1560 } 1561 ] 1562 } 1563 1564 for i, subsys in enumerate(remote_subsystem_list): 1565 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1566 nvme_ctrl = { 1567 "method": "bdev_nvme_attach_controller", 1568 "params": { 1569 "name": "Nvme{}".format(i), 1570 "trtype": self.transport, 1571 "traddr": sub_addr, 1572 "trsvcid": sub_port, 1573 "subnqn": sub_nqn, 1574 "adrfam": "IPv4" 1575 } 1576 } 1577 1578 if self.enable_adq: 1579 nvme_ctrl["params"].update({"priority": "1"}) 1580 1581 if self.enable_data_digest: 1582 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1583 1584 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1585 1586 return json.dumps(spdk_cfg_section, indent=2) 1587 1588 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1589 self.available_cpus = self.get_numa_cpu_map() 1590 filename_section = "" 1591 if len(threads) >= len(subsystems): 1592 threads = range(0, len(subsystems)) 1593 1594 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1595 # to allow better grouping when splitting into fio sections. 1596 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1597 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1598 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1599 1600 nvme_per_split = int(len(subsystems) / len(threads)) 1601 remainder = len(subsystems) % len(threads) 1602 iterator = iter(filenames) 1603 result = [] 1604 for i in range(len(threads)): 1605 result.append([]) 1606 for _ in range(nvme_per_split): 1607 result[i].append(next(iterator)) 1608 if remainder: 1609 result[i].append(next(iterator)) 1610 remainder -= 1 1611 for i, r in enumerate(result): 1612 header = "[filename%s]" % i 1613 disks = "\n".join(["filename=%s" % x for x in r]) 1614 job_section_qd = round((io_depth * len(r)) / num_jobs) 1615 if job_section_qd == 0: 1616 job_section_qd = 1 1617 iodepth = "iodepth=%s" % job_section_qd 1618 1619 offset_section = "" 1620 if offset: 1621 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1622 1623 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1624 1625 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1626 1627 return filename_section 1628 1629 def get_nvme_subsystem_numa(self, bdev_name): 1630 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1631 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1632 1633 # Remove two last characters to get controller name instead of subsystem name 1634 nvme_ctrl = bdev_name[:-2] 1635 for bdev in bdev_conf_json_obj: 1636 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1637 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1638 return None 1639 1640 1641if __name__ == "__main__": 1642 exit_code = 0 1643 1644 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1645 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1646 1647 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1648 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1649 help='Configuration file.') 1650 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1651 help='Results directory.') 1652 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1653 help='CSV results filename.') 1654 parser.add_argument('-f', '--force', default=False, action='store_true', 1655 dest='force', help="""Force script to continue and try to use all 1656 available NVMe devices during test. 1657 WARNING: Might result in data loss on used NVMe drives""") 1658 1659 args = parser.parse_args() 1660 1661 logging.basicConfig(level=logging.INFO, 1662 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1663 1664 logging.info("Using config file: %s" % args.config) 1665 with open(args.config, "r") as config: 1666 data = json.load(config) 1667 1668 initiators = [] 1669 fio_cases = [] 1670 1671 general_config = data["general"] 1672 target_config = data["target"] 1673 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1674 1675 if "null_block_devices" not in data["target"] and \ 1676 (args.force is False and 1677 "allowlist" not in data["target"] and 1678 "blocklist" not in data["target"]): 1679 # TODO: Also check if allowlist or blocklist are not empty. 1680 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1681 You can choose to use all available NVMe drives on your system, which may potentially 1682 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1683 exit(1) 1684 1685 for k, v in data.items(): 1686 if "target" in k: 1687 v.update({"results_dir": args.results}) 1688 if data[k]["mode"] == "spdk": 1689 target_obj = SPDKTarget(k, data["general"], v) 1690 elif data[k]["mode"] == "kernel": 1691 target_obj = KernelTarget(k, data["general"], v) 1692 elif "initiator" in k: 1693 if data[k]["mode"] == "spdk": 1694 init_obj = SPDKInitiator(k, data["general"], v) 1695 elif data[k]["mode"] == "kernel": 1696 init_obj = KernelInitiator(k, data["general"], v) 1697 initiators.append(init_obj) 1698 elif "fio" in k: 1699 fio_workloads = itertools.product(data[k]["bs"], 1700 data[k]["qd"], 1701 data[k]["rw"]) 1702 1703 fio_run_time = data[k]["run_time"] 1704 fio_ramp_time = data[k]["ramp_time"] 1705 fio_rw_mix_read = data[k]["rwmixread"] 1706 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1707 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1708 1709 fio_rate_iops = 0 1710 if "rate_iops" in data[k]: 1711 fio_rate_iops = data[k]["rate_iops"] 1712 1713 fio_offset = False 1714 if "offset" in data[k]: 1715 fio_offset = data[k]["offset"] 1716 fio_offset_inc = 0 1717 if "offset_inc" in data[k]: 1718 fio_offset_inc = data[k]["offset_inc"] 1719 else: 1720 continue 1721 1722 try: 1723 os.mkdir(args.results) 1724 except FileExistsError: 1725 pass 1726 1727 for i in initiators: 1728 target_obj.initiator_info.append( 1729 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1730 ) 1731 1732 # TODO: This try block is definietly too large. Need to break this up into separate 1733 # logical blocks to reduce size. 1734 try: 1735 target_obj.tgt_start() 1736 1737 for i in initiators: 1738 i.match_subsystems(target_obj.subsystem_info_list) 1739 if i.enable_adq: 1740 i.adq_configure_tc() 1741 1742 # Poor mans threading 1743 # Run FIO tests 1744 for block_size, io_depth, rw in fio_workloads: 1745 configs = [] 1746 for i in initiators: 1747 i.init_connect() 1748 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1749 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1750 fio_offset, fio_offset_inc) 1751 configs.append(cfg) 1752 1753 for run_no in range(1, fio_run_num+1): 1754 threads = [] 1755 power_daemon = None 1756 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1757 1758 for i, cfg in zip(initiators, configs): 1759 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1760 threads.append(t) 1761 if target_obj.enable_sar: 1762 sar_file_prefix = measurements_prefix + "_sar" 1763 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1764 threads.append(t) 1765 1766 if target_obj.enable_pcm: 1767 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1768 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1769 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1770 threads.append(pcm_cpu_t) 1771 1772 if target_obj.enable_bw: 1773 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1774 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1775 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1776 threads.append(t) 1777 1778 if target_obj.enable_dpdk_memory: 1779 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1780 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1781 threads.append(t) 1782 1783 if target_obj.enable_pm: 1784 power_daemon = threading.Thread(target=target_obj.measure_power, 1785 args=(args.results, measurements_prefix, script_full_dir, 1786 fio_ramp_time, fio_run_time)) 1787 threads.append(power_daemon) 1788 1789 for t in threads: 1790 t.start() 1791 for t in threads: 1792 t.join() 1793 1794 for i in initiators: 1795 i.init_disconnect() 1796 i.copy_result_files(args.results) 1797 try: 1798 parse_results(args.results, args.csv_filename) 1799 except Exception as err: 1800 exit_code = 1 1801 logging.error("There was an error with parsing the results") 1802 logging.error(err) 1803 finally: 1804 for i in initiators: 1805 try: 1806 i.stop() 1807 except Exception as err: 1808 pass 1809 target_obj.stop() 1810 sys.exit(exit_code) 1811