1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False), 57 ConfigField(name='pause_frames', default=False) 58 ] 59 self.read_config(config_fields, general_config) 60 self.transport = self.transport.lower() 61 62 config_fields = [ 63 ConfigField(name='nic_ips', required=True), 64 ConfigField(name='mode', required=True), 65 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 66 ConfigField(name='enable_arfs', default=False), 67 ConfigField(name='tuned_profile', default='') 68 ] 69 self.read_config(config_fields, server_config) 70 71 self.local_nic_info = [] 72 self._nics_json_obj = {} 73 self.svc_restore_dict = {} 74 self.sysctl_restore_dict = {} 75 self.tuned_restore_dict = {} 76 self.governor_restore = "" 77 78 self.enable_adq = server_config.get('adq_enable', False) 79 self.adq_priority = 1 if self.enable_adq else None 80 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 81 82 if not re.match(r'^[A-Za-z0-9\-]+$', name): 83 self.log.info("Please use a name which contains only letters, numbers or dashes") 84 sys.exit(1) 85 86 def read_config(self, config_fields, config): 87 for config_field in config_fields: 88 value = config.get(config_field.name, config_field.default) 89 if config_field.required is True and value is None: 90 raise ValueError('%s config key is required' % config_field.name) 91 92 setattr(self, config_field.name, value) 93 94 @staticmethod 95 def get_uncommented_lines(lines): 96 return [line for line in lines if line and not line.startswith('#')] 97 98 def get_nic_name_by_ip(self, ip): 99 if not self._nics_json_obj: 100 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 101 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 102 for nic in self._nics_json_obj: 103 for addr in nic["addr_info"]: 104 if ip in addr["local"]: 105 return nic["ifname"] 106 107 @abstractmethod 108 def set_local_nic_info_helper(self): 109 pass 110 111 def set_local_nic_info(self, pci_info): 112 def extract_network_elements(json_obj): 113 nic_list = [] 114 if isinstance(json_obj, list): 115 for x in json_obj: 116 nic_list.extend(extract_network_elements(x)) 117 elif isinstance(json_obj, dict): 118 if "children" in json_obj: 119 nic_list.extend(extract_network_elements(json_obj["children"])) 120 if "class" in json_obj.keys() and "network" in json_obj["class"]: 121 nic_list.append(json_obj) 122 return nic_list 123 124 self.local_nic_info = extract_network_elements(pci_info) 125 126 def get_nic_numa_node(self, nic_name): 127 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 128 129 def get_numa_cpu_map(self): 130 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 131 numa_cpu_json_map = {} 132 133 for cpu in numa_cpu_json_obj["cpus"]: 134 cpu_num = int(cpu["cpu"]) 135 numa_node = int(cpu["node"]) 136 numa_cpu_json_map.setdefault(numa_node, []) 137 numa_cpu_json_map[numa_node].append(cpu_num) 138 139 return numa_cpu_json_map 140 141 # pylint: disable=R0201 142 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 143 return "" 144 145 def configure_system(self): 146 self.load_drivers() 147 self.configure_services() 148 self.configure_sysctl() 149 self.configure_arfs() 150 self.configure_tuned() 151 self.configure_cpu_governor() 152 self.configure_irq_affinity(**self.irq_settings) 153 self.configure_pause_frames() 154 155 RDMA_PROTOCOL_IWARP = 0 156 RDMA_PROTOCOL_ROCE = 1 157 RDMA_PROTOCOL_UNKNOWN = -1 158 159 def check_rdma_protocol(self): 160 try: 161 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 162 roce_ena = roce_ena.strip() 163 if roce_ena == "0": 164 return self.RDMA_PROTOCOL_IWARP 165 else: 166 return self.RDMA_PROTOCOL_ROCE 167 except CalledProcessError as e: 168 self.log.error("ERROR: failed to check RDMA protocol!") 169 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 170 return self.RDMA_PROTOCOL_UNKNOWN 171 172 def load_drivers(self): 173 self.log.info("Loading drivers") 174 self.exec_cmd(["sudo", "modprobe", "-a", 175 "nvme-%s" % self.transport, 176 "nvmet-%s" % self.transport]) 177 current_mode = self.check_rdma_protocol() 178 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 179 self.log.error("ERROR: failed to check RDMA protocol mode") 180 return 181 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 182 self.reload_driver("irdma", "roce_ena=1") 183 self.log.info("Loaded irdma driver with RoCE enabled") 184 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 185 self.log.info("Leaving irdma driver with RoCE enabled") 186 else: 187 self.reload_driver("irdma", "roce_ena=0") 188 self.log.info("Loaded irdma driver with iWARP enabled") 189 190 def set_pause_frames(self, rx_state, tx_state): 191 for nic_ip in self.nic_ips: 192 nic_name = self.get_nic_name_by_ip(nic_ip) 193 self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state]) 194 195 def configure_pause_frames(self): 196 if not self.pause_frames: 197 self.log.info("Turning off pause frames") 198 self.set_pause_frames("off", "off") 199 return 200 self.log.info("Configuring pause frames") 201 self.set_pause_frames("on", "on") 202 203 def configure_arfs(self): 204 rps_flow_cnt = 512 205 if not self.enable_arfs: 206 rps_flow_cnt = 0 207 208 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 209 for nic_name in nic_names: 210 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 211 self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}") 212 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 213 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 214 215 for qf in queue_files: 216 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 217 218 def configure_adq(self): 219 self.adq_load_modules() 220 self.adq_configure_nic() 221 222 def adq_load_modules(self): 223 self.log.info("Modprobing ADQ-related Linux modules...") 224 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 225 for module in adq_module_deps: 226 try: 227 self.exec_cmd(["sudo", "modprobe", module]) 228 self.log.info("%s loaded!" % module) 229 except CalledProcessError as e: 230 self.log.error("ERROR: failed to load module %s" % module) 231 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 232 233 def adq_configure_tc(self): 234 self.log.info("Configuring ADQ Traffic classes and filters...") 235 236 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 237 num_queues_tc1 = self.num_cores 238 port_param = "dst_port" if isinstance(self, Target) else "src_port" 239 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 240 241 for nic_ip in self.nic_ips: 242 nic_name = self.get_nic_name_by_ip(nic_ip) 243 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 244 245 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 246 "root", "mqprio", "num_tc", "2", "map", "0", "1", 247 "queues", "%s@0" % num_queues_tc0, 248 "%s@%s" % (num_queues_tc1, num_queues_tc0), 249 "hw", "1", "mode", "channel"] 250 self.log.info(" ".join(tc_qdisc_map_cmd)) 251 self.exec_cmd(tc_qdisc_map_cmd) 252 253 time.sleep(5) 254 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 255 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 256 self.exec_cmd(tc_qdisc_ingress_cmd) 257 258 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 259 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 260 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 261 262 for port in nic_ports: 263 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 264 "protocol", "ip", "ingress", "prio", "1", "flower", 265 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 266 "skip_sw", "hw_tc", "1"] 267 self.log.info(" ".join(tc_filter_cmd)) 268 self.exec_cmd(tc_filter_cmd) 269 270 # show tc configuration 271 self.log.info("Show tc configuration for %s NIC..." % nic_name) 272 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 273 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 274 self.log.info("%s" % tc_disk_out) 275 self.log.info("%s" % tc_filter_out) 276 277 # Ethtool coalesce settings must be applied after configuring traffic classes 278 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 279 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 280 281 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 282 xps_cmd = ["sudo", xps_script_path, nic_name] 283 self.log.info(xps_cmd) 284 self.exec_cmd(xps_cmd) 285 286 def reload_driver(self, driver, *modprobe_args): 287 288 try: 289 self.exec_cmd(["sudo", "rmmod", driver]) 290 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 291 except CalledProcessError as e: 292 self.log.error("ERROR: failed to reload %s module!" % driver) 293 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 294 295 def adq_configure_nic(self): 296 self.log.info("Configuring NIC port settings for ADQ testing...") 297 298 # Reload the driver first, to make sure any previous settings are re-set. 299 self.reload_driver("ice") 300 301 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 302 for nic in nic_names: 303 self.log.info(nic) 304 try: 305 self.exec_cmd(["sudo", "ethtool", "-K", nic, 306 "hw-tc-offload", "on"]) # Enable hardware TC offload 307 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 308 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 309 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 310 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 311 # but can be used with 4k block sizes as well 312 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 313 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 314 except subprocess.CalledProcessError as e: 315 self.log.error("ERROR: failed to configure NIC port using ethtool!") 316 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 317 self.log.info("Please update your NIC driver and firmware versions and try again.") 318 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 319 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 320 321 def configure_services(self): 322 self.log.info("Configuring active services...") 323 svc_config = configparser.ConfigParser(strict=False) 324 325 # Below list is valid only for RHEL / Fedora systems and might not 326 # contain valid names for other distributions. 327 svc_target_state = { 328 "firewalld": "inactive", 329 "irqbalance": "inactive", 330 "lldpad.service": "inactive", 331 "lldpad.socket": "inactive" 332 } 333 334 for service in svc_target_state: 335 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 336 out = "\n".join(["[%s]" % service, out]) 337 svc_config.read_string(out) 338 339 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 340 continue 341 342 service_state = svc_config[service]["ActiveState"] 343 self.log.info("Current state of %s service is %s" % (service, service_state)) 344 self.svc_restore_dict.update({service: service_state}) 345 if service_state != "inactive": 346 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 347 self.exec_cmd(["sudo", "systemctl", "stop", service]) 348 349 def configure_sysctl(self): 350 self.log.info("Tuning sysctl settings...") 351 352 busy_read = 0 353 busy_poll = 0 354 if self.enable_adq and self.mode == "spdk": 355 busy_read = 1 356 busy_poll = 1 357 358 sysctl_opts = { 359 "net.core.busy_poll": busy_poll, 360 "net.core.busy_read": busy_read, 361 "net.core.somaxconn": 4096, 362 "net.core.netdev_max_backlog": 8192, 363 "net.ipv4.tcp_max_syn_backlog": 16384, 364 "net.core.rmem_max": 268435456, 365 "net.core.wmem_max": 268435456, 366 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 367 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 368 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 369 "net.ipv4.route.flush": 1, 370 "vm.overcommit_memory": 1, 371 "net.core.rps_sock_flow_entries": 0 372 } 373 374 if self.enable_adq: 375 sysctl_opts.update(self.adq_set_busy_read(1)) 376 377 if self.enable_arfs: 378 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 379 380 for opt, value in sysctl_opts.items(): 381 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 382 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 383 384 def configure_tuned(self): 385 if not self.tuned_profile: 386 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 387 return 388 389 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 390 service = "tuned" 391 tuned_config = configparser.ConfigParser(strict=False) 392 393 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 394 out = "\n".join(["[%s]" % service, out]) 395 tuned_config.read_string(out) 396 tuned_state = tuned_config[service]["ActiveState"] 397 self.svc_restore_dict.update({service: tuned_state}) 398 399 if tuned_state != "inactive": 400 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 401 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 402 403 self.tuned_restore_dict = { 404 "profile": profile, 405 "mode": profile_mode 406 } 407 408 self.exec_cmd(["sudo", "systemctl", "start", service]) 409 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 410 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 411 412 def configure_cpu_governor(self): 413 self.log.info("Setting CPU governor to performance...") 414 415 # This assumes that there is the same CPU scaling governor on each CPU 416 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 417 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 418 419 def get_core_list_from_mask(self, core_mask): 420 # Generate list of individual cores from hex core mask 421 # (e.g. '0xffff') or list containing: 422 # - individual cores (e.g. '1, 2, 3') 423 # - core ranges (e.g. '0-3') 424 # - mix of both (e.g. '0, 1-4, 9, 11-13') 425 426 core_list = [] 427 if "0x" in core_mask: 428 core_mask_int = int(core_mask, 16) 429 for i in range(core_mask_int.bit_length()): 430 if (1 << i) & core_mask_int: 431 core_list.append(i) 432 return core_list 433 else: 434 # Core list can be provided in .json config with square brackets 435 # remove them first 436 core_mask = core_mask.replace("[", "") 437 core_mask = core_mask.replace("]", "") 438 439 for i in core_mask.split(","): 440 if "-" in i: 441 start, end = i.split("-") 442 core_range = range(int(start), int(end) + 1) 443 core_list.extend(core_range) 444 else: 445 core_list.append(int(i)) 446 return core_list 447 448 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 449 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 450 451 if mode not in ["default", "bynode", "cpulist"]: 452 raise ValueError("%s irq affinity setting not supported" % mode) 453 454 if mode == "cpulist" and not cpulist: 455 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 456 457 affinity_script = "set_irq_affinity.sh" 458 if "default" not in mode: 459 affinity_script = "set_irq_affinity_cpulist.sh" 460 system_cpu_map = self.get_numa_cpu_map() 461 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 462 463 def cpu_list_to_string(cpulist): 464 return ",".join(map(lambda x: str(x), cpulist)) 465 466 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 467 for nic_name in nic_names: 468 irq_cmd = ["sudo", irq_script_path] 469 470 # Use only CPU cores matching NIC NUMA node. 471 # Remove any CPU cores if they're on exclusion list. 472 if mode == "bynode": 473 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 474 if cpulist and exclude_cpulist: 475 disallowed_cpus = self.get_core_list_from_mask(cpulist) 476 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 477 if not irq_cpus: 478 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 479 irq_cmd.append(cpu_list_to_string(irq_cpus)) 480 481 if mode == "cpulist": 482 irq_cpus = self.get_core_list_from_mask(cpulist) 483 if exclude_cpulist: 484 # Flatten system CPU list, we don't need NUMA awareness here 485 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 486 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 487 if not irq_cpus: 488 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 489 irq_cmd.append(cpu_list_to_string(irq_cpus)) 490 491 irq_cmd.append(nic_name) 492 self.log.info(irq_cmd) 493 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 494 495 def restore_services(self): 496 self.log.info("Restoring services...") 497 for service, state in self.svc_restore_dict.items(): 498 cmd = "stop" if state == "inactive" else "start" 499 self.exec_cmd(["sudo", "systemctl", cmd, service]) 500 501 def restore_sysctl(self): 502 self.log.info("Restoring sysctl settings...") 503 for opt, value in self.sysctl_restore_dict.items(): 504 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 505 506 def restore_tuned(self): 507 self.log.info("Restoring tuned-adm settings...") 508 509 if not self.tuned_restore_dict: 510 return 511 512 if self.tuned_restore_dict["mode"] == "auto": 513 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 514 self.log.info("Reverted tuned-adm to auto_profile.") 515 else: 516 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 517 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 518 519 def restore_governor(self): 520 self.log.info("Restoring CPU governor setting...") 521 if self.governor_restore: 522 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 523 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 524 525 def restore_settings(self): 526 self.restore_governor() 527 self.restore_tuned() 528 self.restore_services() 529 self.restore_sysctl() 530 if self.enable_adq: 531 self.reload_driver("ice") 532 533 534class Target(Server): 535 def __init__(self, name, general_config, target_config): 536 super().__init__(name, general_config, target_config) 537 538 # Defaults 539 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 540 self.subsystem_info_list = [] 541 self.initiator_info = [] 542 543 config_fields = [ 544 ConfigField(name='mode', required=True), 545 ConfigField(name='results_dir', required=True), 546 ConfigField(name='enable_pm', default=True), 547 ConfigField(name='enable_sar', default=True), 548 ConfigField(name='enable_pcm', default=True), 549 ConfigField(name='enable_dpdk_memory', default=True) 550 ] 551 self.read_config(config_fields, target_config) 552 553 self.null_block = target_config.get('null_block_devices', 0) 554 self.scheduler_name = target_config.get('scheduler_settings', 'static') 555 self.enable_zcopy = target_config.get('zcopy_settings', False) 556 self.enable_bw = target_config.get('enable_bandwidth', True) 557 self.nvme_blocklist = target_config.get('blocklist', []) 558 self.nvme_allowlist = target_config.get('allowlist', []) 559 560 # Blocklist takes precedence, remove common elements from allowlist 561 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 562 563 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 564 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 565 566 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 567 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 568 self.set_local_nic_info(self.set_local_nic_info_helper()) 569 570 if self.skip_spdk_install is False: 571 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 572 573 self.configure_system() 574 if self.enable_adq: 575 self.configure_adq() 576 self.configure_irq_affinity() 577 self.sys_config() 578 579 def set_local_nic_info_helper(self): 580 return json.loads(self.exec_cmd(["lshw", "-json"])) 581 582 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 583 stderr_opt = None 584 if stderr_redirect: 585 stderr_opt = subprocess.STDOUT 586 if change_dir: 587 old_cwd = os.getcwd() 588 os.chdir(change_dir) 589 self.log.info("Changing directory to %s" % change_dir) 590 591 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 592 593 if change_dir: 594 os.chdir(old_cwd) 595 self.log.info("Changing directory to %s" % old_cwd) 596 return out 597 598 def zip_spdk_sources(self, spdk_dir, dest_file): 599 self.log.info("Zipping SPDK source directory") 600 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 601 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 602 for file in files: 603 fh.write(os.path.relpath(os.path.join(root, file))) 604 fh.close() 605 self.log.info("Done zipping") 606 607 @staticmethod 608 def _chunks(input_list, chunks_no): 609 div, rem = divmod(len(input_list), chunks_no) 610 for i in range(chunks_no): 611 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 612 yield input_list[si:si + (div + 1 if i < rem else div)] 613 614 def spread_bdevs(self, req_disks): 615 # Spread available block devices indexes: 616 # - evenly across available initiator systems 617 # - evenly across available NIC interfaces for 618 # each initiator 619 # Not NUMA aware. 620 ip_bdev_map = [] 621 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 622 623 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 624 self.initiator_info[i]["bdev_range"] = init_chunk 625 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 626 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 627 for c in nic_chunk: 628 ip_bdev_map.append((ip, c)) 629 return ip_bdev_map 630 631 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 632 cpu_number = os.cpu_count() 633 sar_idle_sum = 0 634 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 635 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 636 637 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 638 time.sleep(ramp_time) 639 self.log.info("Starting SAR measurements") 640 641 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 642 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 643 for line in out.split("\n"): 644 if "Average" in line: 645 if "CPU" in line: 646 self.log.info("Summary CPU utilization from SAR:") 647 self.log.info(line) 648 elif "all" in line: 649 self.log.info(line) 650 else: 651 sar_idle_sum += float(line.split()[7]) 652 fh.write(out) 653 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 654 655 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 656 f.write("%0.2f" % sar_cpu_usage) 657 658 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 659 time.sleep(ramp_time) 660 self.log.info("Starting power measurements") 661 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 662 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 663 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 664 665 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 666 time.sleep(ramp_time) 667 cmd = ["pcm", "1", "-i=%s" % run_time, 668 "-csv=%s/%s" % (results_dir, pcm_file_name)] 669 subprocess.run(cmd) 670 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 671 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 672 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 673 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 674 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 675 676 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 677 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 678 time.sleep(ramp_time) 679 self.log.info("INFO: starting network bandwidth measure") 680 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 681 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 682 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 683 # Improve this so that additional file containing measurements average is generated too. 684 # TODO: Monitor only these interfaces which are currently used to run the workload. 685 686 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 687 self.log.info("INFO: waiting to generate DPDK memory usage") 688 time.sleep(ramp_time) 689 self.log.info("INFO: generating DPDK memory usage") 690 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 691 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 692 693 def sys_config(self): 694 self.log.info("====Kernel release:====") 695 self.log.info(os.uname().release) 696 self.log.info("====Kernel command line:====") 697 with open('/proc/cmdline') as f: 698 cmdline = f.readlines() 699 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 700 self.log.info("====sysctl conf:====") 701 with open('/etc/sysctl.conf') as f: 702 sysctl = f.readlines() 703 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 704 self.log.info("====Cpu power info:====") 705 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 706 self.log.info("====zcopy settings:====") 707 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 708 self.log.info("====Scheduler settings:====") 709 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 710 711 712class Initiator(Server): 713 def __init__(self, name, general_config, initiator_config): 714 super().__init__(name, general_config, initiator_config) 715 716 # required fields and defaults 717 config_fields = [ 718 ConfigField(name='mode', required=True), 719 ConfigField(name='ip', required=True), 720 ConfigField(name='target_nic_ips', required=True), 721 ConfigField(name='cpus_allowed', default=None), 722 ConfigField(name='cpus_allowed_policy', default='shared'), 723 ConfigField(name='spdk_dir', default='/tmp/spdk'), 724 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 725 ConfigField(name='nvmecli_bin', default='nvme'), 726 ConfigField(name='cpu_frequency', default=None), 727 ConfigField(name='allow_cpu_sharing', default=True) 728 ] 729 730 self.read_config(config_fields, initiator_config) 731 732 if os.getenv('SPDK_WORKSPACE'): 733 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 734 735 self.subsystem_info_list = [] 736 737 self.ssh_connection = paramiko.SSHClient() 738 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 739 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 740 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 741 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 742 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 743 744 if self.skip_spdk_install is False: 745 self.copy_spdk("/tmp/spdk.zip") 746 747 self.set_local_nic_info(self.set_local_nic_info_helper()) 748 self.set_cpu_frequency() 749 self.configure_system() 750 if self.enable_adq: 751 self.configure_adq() 752 self.sys_config() 753 754 def set_local_nic_info_helper(self): 755 return json.loads(self.exec_cmd(["lshw", "-json"])) 756 757 def stop(self): 758 self.restore_settings() 759 self.ssh_connection.close() 760 761 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 762 if change_dir: 763 cmd = ["cd", change_dir, ";", *cmd] 764 765 # In case one of the command elements contains whitespace and is not 766 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 767 # when sending to remote system. 768 for i, c in enumerate(cmd): 769 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 770 cmd[i] = '"%s"' % c 771 cmd = " ".join(cmd) 772 773 # Redirect stderr to stdout thanks using get_pty option if needed 774 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 775 out = stdout.read().decode(encoding="utf-8") 776 777 # Check the return code 778 rc = stdout.channel.recv_exit_status() 779 if rc: 780 raise CalledProcessError(int(rc), cmd, out) 781 782 return out 783 784 def put_file(self, local, remote_dest): 785 ftp = self.ssh_connection.open_sftp() 786 ftp.put(local, remote_dest) 787 ftp.close() 788 789 def get_file(self, remote, local_dest): 790 ftp = self.ssh_connection.open_sftp() 791 ftp.get(remote, local_dest) 792 ftp.close() 793 794 def copy_spdk(self, local_spdk_zip): 795 self.log.info("Copying SPDK sources to initiator %s" % self.name) 796 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 797 self.log.info("Copied sources zip from target") 798 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 799 self.log.info("Sources unpacked") 800 801 def copy_result_files(self, dest_dir): 802 self.log.info("Copying results") 803 804 if not os.path.exists(dest_dir): 805 os.mkdir(dest_dir) 806 807 # Get list of result files from initiator and copy them back to target 808 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 809 810 for file in file_list: 811 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 812 os.path.join(dest_dir, file)) 813 self.log.info("Done copying results") 814 815 def match_subsystems(self, target_subsytems): 816 subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips] 817 subsystems.sort(key=lambda x: x[1]) 818 self.log.info("Found matching subsystems on target side:") 819 for s in subsystems: 820 self.log.info(s) 821 self.subsystem_info_list = subsystems 822 823 @abstractmethod 824 def init_connect(self): 825 pass 826 827 @abstractmethod 828 def init_disconnect(self): 829 pass 830 831 @abstractmethod 832 def gen_fio_filename_conf(self, *args, **kwargs): 833 # Logic implemented in SPDKInitiator and KernelInitiator classes 834 pass 835 836 def get_route_nic_numa(self, remote_nvme_ip): 837 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 838 local_nvme_nic = local_nvme_nic[0]["dev"] 839 return self.get_nic_numa_node(local_nvme_nic) 840 841 @staticmethod 842 def gen_fio_offset_section(offset_inc, num_jobs): 843 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 844 return "\n".join(["size=%s%%" % offset_inc, 845 "offset=0%", 846 "offset_increment=%s%%" % offset_inc]) 847 848 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 849 numa_stats = {} 850 allowed_cpus = [] 851 for nvme in fio_filenames_list: 852 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 853 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 854 855 # Use the most common NUMA node for this chunk to allocate memory and CPUs 856 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 857 858 # Check if we have enough free CPUs to pop from the list before assigning them 859 if len(self.available_cpus[section_local_numa]) < num_jobs: 860 if self.allow_cpu_sharing: 861 self.log.info("Regenerating available CPU list %s" % section_local_numa) 862 # Remove still available CPUs from the regenerated list. We don't want to 863 # regenerate it with duplicates. 864 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 865 self.available_cpus[section_local_numa].extend(cpus_regen) 866 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 867 else: 868 self.log.error("No more free CPU cores to use from allowed_cpus list!") 869 raise IndexError 870 871 for _ in range(num_jobs): 872 try: 873 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 874 except IndexError: 875 self.log.error("No more free CPU cores to use from allowed_cpus list!") 876 raise 877 878 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 879 "numa_mem_policy=prefer:%s" % section_local_numa]) 880 881 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 882 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 883 offset=False, offset_inc=0): 884 fio_conf_template = """ 885[global] 886ioengine={ioengine} 887{spdk_conf} 888thread=1 889group_reporting=1 890direct=1 891percentile_list=50:90:99:99.5:99.9:99.99:99.999 892 893norandommap=1 894rw={rw} 895rwmixread={rwmixread} 896bs={block_size} 897time_based=1 898ramp_time={ramp_time} 899runtime={run_time} 900rate_iops={rate_iops} 901""" 902 903 if self.cpus_allowed is not None: 904 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 905 cpus_num = 0 906 cpus = self.cpus_allowed.split(",") 907 for cpu in cpus: 908 if "-" in cpu: 909 a, b = cpu.split("-") 910 a = int(a) 911 b = int(b) 912 cpus_num += len(range(a, b)) 913 else: 914 cpus_num += 1 915 self.num_cores = cpus_num 916 threads = range(0, self.num_cores) 917 elif hasattr(self, 'num_cores'): 918 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 919 threads = range(0, int(self.num_cores)) 920 else: 921 self.num_cores = len(self.subsystem_info_list) 922 threads = range(0, len(self.subsystem_info_list)) 923 924 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 925 offset, offset_inc) 926 927 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 928 rw=rw, rwmixread=rwmixread, block_size=block_size, 929 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 930 931 # TODO: hipri disabled for now, as it causes fio errors: 932 # io_u error on file /dev/nvme2n1: Operation not supported 933 # See comment in KernelInitiator class, init_connect() function 934 if "io_uring" in self.ioengine: 935 fio_config = fio_config + """ 936fixedbufs=1 937registerfiles=1 938#hipri=1 939""" 940 if num_jobs: 941 fio_config = fio_config + "numjobs=%s \n" % num_jobs 942 if self.cpus_allowed is not None: 943 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 944 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 945 fio_config = fio_config + filename_section 946 947 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 948 if hasattr(self, "num_cores"): 949 fio_config_filename += "_%sCPU" % self.num_cores 950 fio_config_filename += ".fio" 951 952 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 953 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 954 self.log.info("Created FIO Config:") 955 self.log.info(fio_config) 956 957 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 958 959 def set_cpu_frequency(self): 960 if self.cpu_frequency is not None: 961 try: 962 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 963 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 964 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 965 except Exception: 966 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 967 sys.exit(1) 968 else: 969 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 970 971 def run_fio(self, fio_config_file, run_num=1): 972 job_name, _ = os.path.splitext(fio_config_file) 973 self.log.info("Starting FIO run for job: %s" % job_name) 974 self.log.info("Using FIO: %s" % self.fio_bin) 975 976 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 977 try: 978 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 979 "--output=%s" % output_filename, "--eta=never"], True) 980 self.log.info(output) 981 self.log.info("FIO run finished. Results in: %s" % output_filename) 982 except subprocess.CalledProcessError as e: 983 self.log.error("ERROR: Fio process failed!") 984 self.log.error(e.stdout) 985 986 def sys_config(self): 987 self.log.info("====Kernel release:====") 988 self.log.info(self.exec_cmd(["uname", "-r"])) 989 self.log.info("====Kernel command line:====") 990 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 991 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 992 self.log.info("====sysctl conf:====") 993 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 994 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 995 self.log.info("====Cpu power info:====") 996 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 997 998 999class KernelTarget(Target): 1000 def __init__(self, name, general_config, target_config): 1001 super().__init__(name, general_config, target_config) 1002 # Defaults 1003 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1004 1005 def load_drivers(self): 1006 self.log.info("Loading drivers") 1007 super().load_drivers() 1008 if self.null_block: 1009 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1010 1011 def configure_adq(self): 1012 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1013 1014 def adq_configure_tc(self): 1015 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1016 1017 def adq_set_busy_read(self, busy_read_val): 1018 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1019 return {"net.core.busy_read": 0} 1020 1021 def stop(self): 1022 self.nvmet_command(self.nvmet_bin, "clear") 1023 self.restore_settings() 1024 1025 def get_nvme_device_bdf(self, nvme_dev_path): 1026 nvme_name = os.path.basename(nvme_dev_path) 1027 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1028 1029 def get_nvme_devices(self): 1030 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1031 nvme_list = [] 1032 for dev in dev_list: 1033 if "nvme" not in dev: 1034 continue 1035 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1036 continue 1037 if len(self.nvme_allowlist) == 0: 1038 nvme_list.append(dev) 1039 continue 1040 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1041 nvme_list.append(dev) 1042 return nvme_list 1043 1044 def nvmet_command(self, nvmet_bin, command): 1045 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1046 1047 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1048 1049 nvmet_cfg = { 1050 "ports": [], 1051 "hosts": [], 1052 "subsystems": [], 1053 } 1054 1055 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1056 port = str(4420 + bdev_num) 1057 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1058 serial = "SPDK00%s" % bdev_num 1059 bdev_name = nvme_list[bdev_num] 1060 1061 nvmet_cfg["subsystems"].append({ 1062 "allowed_hosts": [], 1063 "attr": { 1064 "allow_any_host": "1", 1065 "serial": serial, 1066 "version": "1.3" 1067 }, 1068 "namespaces": [ 1069 { 1070 "device": { 1071 "path": bdev_name, 1072 "uuid": "%s" % uuid.uuid4() 1073 }, 1074 "enable": 1, 1075 "nsid": port 1076 } 1077 ], 1078 "nqn": nqn 1079 }) 1080 1081 nvmet_cfg["ports"].append({ 1082 "addr": { 1083 "adrfam": "ipv4", 1084 "traddr": ip, 1085 "trsvcid": port, 1086 "trtype": self.transport 1087 }, 1088 "portid": bdev_num, 1089 "referrals": [], 1090 "subsystems": [nqn] 1091 }) 1092 1093 self.subsystem_info_list.append((port, nqn, ip)) 1094 self.subsys_no = len(self.subsystem_info_list) 1095 1096 with open("kernel.conf", "w") as fh: 1097 fh.write(json.dumps(nvmet_cfg, indent=2)) 1098 1099 def tgt_start(self): 1100 self.log.info("Configuring kernel NVMeOF Target") 1101 1102 if self.null_block: 1103 self.log.info("Configuring with null block device.") 1104 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1105 else: 1106 self.log.info("Configuring with NVMe drives.") 1107 nvme_list = self.get_nvme_devices() 1108 1109 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1110 self.subsys_no = len(nvme_list) 1111 1112 self.nvmet_command(self.nvmet_bin, "clear") 1113 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1114 1115 if self.enable_adq: 1116 self.adq_configure_tc() 1117 1118 self.log.info("Done configuring kernel NVMeOF Target") 1119 1120 1121class SPDKTarget(Target): 1122 def __init__(self, name, general_config, target_config): 1123 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1124 # Method setting IRQ affinity is run as part of parent classes init, 1125 # so we need to have self.core_mask set before changing IRQ affinity. 1126 self.core_mask = target_config["core_mask"] 1127 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1128 1129 super().__init__(name, general_config, target_config) 1130 1131 # Format: property, default value 1132 config_fields = [ 1133 ConfigField(name='dif_insert_strip', default=False), 1134 ConfigField(name='null_block_dif_type', default=0), 1135 ConfigField(name='num_shared_buffers', default=4096), 1136 ConfigField(name='max_queue_depth', default=128), 1137 ConfigField(name='bpf_scripts', default=[]), 1138 ConfigField(name='scheduler_core_limit', default=None), 1139 ConfigField(name='dsa_settings', default=False), 1140 ConfigField(name='iobuf_small_pool_count', default=32767), 1141 ConfigField(name='iobuf_large_pool_count', default=16383), 1142 ConfigField(name='num_cqe', default=4096), 1143 ConfigField(name='sock_impl', default='posix') 1144 ] 1145 1146 self.read_config(config_fields, target_config) 1147 1148 self.bpf_proc = None 1149 self.enable_dsa = False 1150 1151 self.log.info("====DSA settings:====") 1152 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1153 1154 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1155 if mode not in ["default", "bynode", "cpulist", 1156 "shared", "split", "split-bynode"]: 1157 self.log.error("%s irq affinity setting not supported" % mode) 1158 raise Exception 1159 1160 # Create core list from SPDK's mask and change it to string. 1161 # This is the type configure_irq_affinity expects for cpulist parameter. 1162 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1163 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1164 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1165 1166 if mode == "shared": 1167 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1168 elif mode == "split": 1169 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1170 elif mode == "split-bynode": 1171 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1172 else: 1173 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1174 1175 def adq_set_busy_read(self, busy_read_val): 1176 return {"net.core.busy_read": busy_read_val} 1177 1178 def get_nvme_devices_count(self): 1179 return len(self.get_nvme_devices()) 1180 1181 def get_nvme_devices(self): 1182 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1183 bdev_bdfs = [] 1184 for bdev in bdev_subsys_json_obj["config"]: 1185 bdev_traddr = bdev["params"]["traddr"] 1186 if bdev_traddr in self.nvme_blocklist: 1187 continue 1188 if len(self.nvme_allowlist) == 0: 1189 bdev_bdfs.append(bdev_traddr) 1190 if bdev_traddr in self.nvme_allowlist: 1191 bdev_bdfs.append(bdev_traddr) 1192 return bdev_bdfs 1193 1194 def spdk_tgt_configure(self): 1195 self.log.info("Configuring SPDK NVMeOF target via RPC") 1196 1197 # Create transport layer 1198 nvmf_transport_params = { 1199 "client": self.client, 1200 "trtype": self.transport, 1201 "num_shared_buffers": self.num_shared_buffers, 1202 "max_queue_depth": self.max_queue_depth, 1203 "dif_insert_or_strip": self.dif_insert_strip, 1204 "sock_priority": self.adq_priority, 1205 "num_cqe": self.num_cqe 1206 } 1207 1208 if self.enable_adq: 1209 nvmf_transport_params["acceptor_poll_rate"] = 10000 1210 1211 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1212 self.log.info("SPDK NVMeOF transport layer:") 1213 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1214 1215 if self.null_block: 1216 self.spdk_tgt_add_nullblock(self.null_block) 1217 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1218 else: 1219 self.spdk_tgt_add_nvme_conf() 1220 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1221 1222 if self.enable_adq: 1223 self.adq_configure_tc() 1224 1225 self.log.info("Done configuring SPDK NVMeOF Target") 1226 1227 def spdk_tgt_add_nullblock(self, null_block_count): 1228 md_size = 0 1229 block_size = 4096 1230 if self.null_block_dif_type != 0: 1231 md_size = 128 1232 1233 self.log.info("Adding null block bdevices to config via RPC") 1234 for i in range(null_block_count): 1235 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1236 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1237 dif_type=self.null_block_dif_type, md_size=md_size) 1238 self.log.info("SPDK Bdevs configuration:") 1239 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1240 1241 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1242 self.log.info("Adding NVMe bdevs to config via RPC") 1243 1244 bdfs = self.get_nvme_devices() 1245 bdfs = [b.replace(":", ".") for b in bdfs] 1246 1247 if req_num_disks: 1248 if req_num_disks > len(bdfs): 1249 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1250 sys.exit(1) 1251 else: 1252 bdfs = bdfs[0:req_num_disks] 1253 1254 for i, bdf in enumerate(bdfs): 1255 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1256 1257 self.log.info("SPDK Bdevs configuration:") 1258 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1259 1260 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1261 self.log.info("Adding subsystems to config") 1262 if not req_num_disks: 1263 req_num_disks = self.get_nvme_devices_count() 1264 1265 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1266 port = str(4420 + bdev_num) 1267 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1268 serial = "SPDK00%s" % bdev_num 1269 bdev_name = "Nvme%sn1" % bdev_num 1270 1271 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1272 allow_any_host=True, max_namespaces=8) 1273 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1274 for nqn_name in [nqn, "discovery"]: 1275 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1276 nqn=nqn_name, 1277 trtype=self.transport, 1278 traddr=ip, 1279 trsvcid=port, 1280 adrfam="ipv4") 1281 self.subsystem_info_list.append((port, nqn, ip)) 1282 self.subsys_no = len(self.subsystem_info_list) 1283 1284 self.log.info("SPDK NVMeOF subsystem configuration:") 1285 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1286 1287 def bpf_start(self): 1288 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1289 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1290 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1291 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1292 1293 with open(self.pid, "r") as fh: 1294 nvmf_pid = str(fh.readline()) 1295 1296 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1297 self.log.info(cmd) 1298 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1299 1300 def tgt_start(self): 1301 if self.null_block: 1302 self.subsys_no = 1 1303 else: 1304 self.subsys_no = self.get_nvme_devices_count() 1305 self.log.info("Starting SPDK NVMeOF Target process") 1306 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1307 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1308 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1309 1310 with open(self.pid, "w") as fh: 1311 fh.write(str(proc.pid)) 1312 self.nvmf_proc = proc 1313 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1314 self.log.info("Waiting for spdk to initialize...") 1315 while True: 1316 if os.path.exists("/var/tmp/spdk.sock"): 1317 break 1318 time.sleep(1) 1319 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1320 1321 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1322 rpc.iobuf.iobuf_set_options(self.client, 1323 small_pool_count=self.iobuf_small_pool_count, 1324 large_pool_count=self.iobuf_large_pool_count, 1325 small_bufsize=None, 1326 large_bufsize=None) 1327 1328 if self.enable_zcopy: 1329 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1330 enable_zerocopy_send_server=True) 1331 self.log.info("Target socket options:") 1332 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1333 1334 if self.enable_adq: 1335 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1336 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1337 nvme_adminq_poll_period_us=100000, retry_count=4) 1338 1339 if self.enable_dsa: 1340 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1341 self.log.info("Target DSA accel module enabled") 1342 1343 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1344 rpc.framework_start_init(self.client) 1345 1346 if self.bpf_scripts: 1347 self.bpf_start() 1348 1349 self.spdk_tgt_configure() 1350 1351 def stop(self): 1352 if self.bpf_proc: 1353 self.log.info("Stopping BPF Trace script") 1354 self.bpf_proc.terminate() 1355 self.bpf_proc.wait() 1356 1357 if hasattr(self, "nvmf_proc"): 1358 try: 1359 self.nvmf_proc.terminate() 1360 self.nvmf_proc.wait(timeout=30) 1361 except Exception as e: 1362 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1363 self.log.info(e) 1364 self.nvmf_proc.kill() 1365 self.nvmf_proc.communicate() 1366 # Try to clean up RPC socket files if they were not removed 1367 # because of using 'kill' 1368 try: 1369 os.remove("/var/tmp/spdk.sock") 1370 os.remove("/var/tmp/spdk.sock.lock") 1371 except FileNotFoundError: 1372 pass 1373 self.restore_settings() 1374 1375 1376class KernelInitiator(Initiator): 1377 def __init__(self, name, general_config, initiator_config): 1378 super().__init__(name, general_config, initiator_config) 1379 1380 # Defaults 1381 self.extra_params = initiator_config.get('extra_params', '') 1382 1383 self.ioengine = "libaio" 1384 self.spdk_conf = "" 1385 1386 if "num_cores" in initiator_config: 1387 self.num_cores = initiator_config["num_cores"] 1388 1389 if "kernel_engine" in initiator_config: 1390 self.ioengine = initiator_config["kernel_engine"] 1391 if "io_uring" in self.ioengine: 1392 self.extra_params += ' --nr-poll-queues=8' 1393 1394 def configure_adq(self): 1395 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1396 1397 def adq_configure_tc(self): 1398 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1399 1400 def adq_set_busy_read(self, busy_read_val): 1401 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1402 return {"net.core.busy_read": 0} 1403 1404 def get_connected_nvme_list(self): 1405 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1406 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1407 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1408 return nvme_list 1409 1410 def init_connect(self): 1411 self.log.info("Below connection attempts may result in error messages, this is expected!") 1412 for subsystem in self.subsystem_info_list: 1413 self.log.info("Trying to connect %s %s %s" % subsystem) 1414 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1415 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1416 time.sleep(2) 1417 1418 if "io_uring" in self.ioengine: 1419 self.log.info("Setting block layer settings for io_uring.") 1420 1421 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1422 # apparently it's not possible for connected subsystems. 1423 # Results in "error: Invalid argument" 1424 block_sysfs_settings = { 1425 "iostats": "0", 1426 "rq_affinity": "0", 1427 "nomerges": "2" 1428 } 1429 1430 for disk in self.get_connected_nvme_list(): 1431 sysfs = os.path.join("/sys/block", disk, "queue") 1432 for k, v in block_sysfs_settings.items(): 1433 sysfs_opt_path = os.path.join(sysfs, k) 1434 try: 1435 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1436 except CalledProcessError as e: 1437 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1438 finally: 1439 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1440 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1441 1442 def init_disconnect(self): 1443 for subsystem in self.subsystem_info_list: 1444 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1445 time.sleep(1) 1446 1447 def get_nvme_subsystem_numa(self, dev_name): 1448 # Remove two last characters to get controller name instead of subsystem name 1449 nvme_ctrl = os.path.basename(dev_name)[:-2] 1450 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1451 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1452 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1453 1454 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1455 self.available_cpus = self.get_numa_cpu_map() 1456 if len(threads) >= len(subsystems): 1457 threads = range(0, len(subsystems)) 1458 1459 # Generate connected nvme devices names and sort them by used NIC numa node 1460 # to allow better grouping when splitting into fio sections. 1461 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1462 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1463 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1464 1465 filename_section = "" 1466 nvme_per_split = int(len(nvme_list) / len(threads)) 1467 remainder = len(nvme_list) % len(threads) 1468 iterator = iter(nvme_list) 1469 result = [] 1470 for i in range(len(threads)): 1471 result.append([]) 1472 for _ in range(nvme_per_split): 1473 result[i].append(next(iterator)) 1474 if remainder: 1475 result[i].append(next(iterator)) 1476 remainder -= 1 1477 for i, r in enumerate(result): 1478 header = "[filename%s]" % i 1479 disks = "\n".join(["filename=%s" % x for x in r]) 1480 job_section_qd = round((io_depth * len(r)) / num_jobs) 1481 if job_section_qd == 0: 1482 job_section_qd = 1 1483 iodepth = "iodepth=%s" % job_section_qd 1484 1485 offset_section = "" 1486 if offset: 1487 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1488 1489 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1490 1491 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1492 1493 return filename_section 1494 1495 1496class SPDKInitiator(Initiator): 1497 def __init__(self, name, general_config, initiator_config): 1498 super().__init__(name, general_config, initiator_config) 1499 1500 if self.skip_spdk_install is False: 1501 self.install_spdk() 1502 1503 # Optional fields 1504 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1505 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1506 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1507 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1508 1509 if "num_cores" in initiator_config: 1510 self.num_cores = initiator_config["num_cores"] 1511 1512 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1513 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1514 1515 def adq_set_busy_read(self, busy_read_val): 1516 return {"net.core.busy_read": busy_read_val} 1517 1518 def install_spdk(self): 1519 self.log.info("Using fio binary %s" % self.fio_bin) 1520 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1521 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1522 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1523 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1524 "--enable-lto", "--disable-unit-tests"]) 1525 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1526 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1527 1528 self.log.info("SPDK built") 1529 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1530 1531 def init_connect(self): 1532 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1533 # bdev plugin initiates connection just before starting IO traffic. 1534 # This is just to have a "init_connect" equivalent of the same function 1535 # from KernelInitiator class. 1536 # Just prepare bdev.conf JSON file for later use and consider it 1537 # "making a connection". 1538 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1539 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1540 1541 def init_disconnect(self): 1542 # SPDK Initiator does not need to explicity disconnect as this gets done 1543 # after fio bdev plugin finishes IO. 1544 return 1545 1546 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1547 spdk_cfg_section = { 1548 "subsystems": [ 1549 { 1550 "subsystem": "bdev", 1551 "config": [] 1552 }, 1553 { 1554 "subsystem": "iobuf", 1555 "config": [ 1556 { 1557 "method": "iobuf_set_options", 1558 "params": { 1559 "small_pool_count": self.small_pool_count, 1560 "large_pool_count": self.large_pool_count 1561 } 1562 } 1563 ] 1564 }, 1565 { 1566 "subsystem": "sock", 1567 "config": [ 1568 { 1569 "method": "sock_set_default_impl", 1570 "params": { 1571 "impl_name": self.sock_impl 1572 } 1573 } 1574 ] 1575 } 1576 ] 1577 } 1578 1579 for i, subsys in enumerate(remote_subsystem_list): 1580 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1581 nvme_ctrl = { 1582 "method": "bdev_nvme_attach_controller", 1583 "params": { 1584 "name": "Nvme{}".format(i), 1585 "trtype": self.transport, 1586 "traddr": sub_addr, 1587 "trsvcid": sub_port, 1588 "subnqn": sub_nqn, 1589 "adrfam": "IPv4" 1590 } 1591 } 1592 1593 if self.enable_adq: 1594 nvme_ctrl["params"].update({"priority": "1"}) 1595 1596 if self.enable_data_digest: 1597 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1598 1599 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1600 1601 return json.dumps(spdk_cfg_section, indent=2) 1602 1603 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1604 self.available_cpus = self.get_numa_cpu_map() 1605 filename_section = "" 1606 if len(threads) >= len(subsystems): 1607 threads = range(0, len(subsystems)) 1608 1609 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1610 # to allow better grouping when splitting into fio sections. 1611 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1612 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1613 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1614 1615 nvme_per_split = int(len(subsystems) / len(threads)) 1616 remainder = len(subsystems) % len(threads) 1617 iterator = iter(filenames) 1618 result = [] 1619 for i in range(len(threads)): 1620 result.append([]) 1621 for _ in range(nvme_per_split): 1622 result[i].append(next(iterator)) 1623 if remainder: 1624 result[i].append(next(iterator)) 1625 remainder -= 1 1626 for i, r in enumerate(result): 1627 header = "[filename%s]" % i 1628 disks = "\n".join(["filename=%s" % x for x in r]) 1629 job_section_qd = round((io_depth * len(r)) / num_jobs) 1630 if job_section_qd == 0: 1631 job_section_qd = 1 1632 iodepth = "iodepth=%s" % job_section_qd 1633 1634 offset_section = "" 1635 if offset: 1636 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1637 1638 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1639 1640 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1641 1642 return filename_section 1643 1644 def get_nvme_subsystem_numa(self, bdev_name): 1645 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1646 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1647 1648 # Remove two last characters to get controller name instead of subsystem name 1649 nvme_ctrl = bdev_name[:-2] 1650 for bdev in bdev_conf_json_obj: 1651 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1652 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1653 return None 1654 1655 1656if __name__ == "__main__": 1657 exit_code = 0 1658 1659 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1660 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1661 1662 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1663 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1664 help='Configuration file.') 1665 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1666 help='Results directory.') 1667 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1668 help='CSV results filename.') 1669 parser.add_argument('-f', '--force', default=False, action='store_true', 1670 dest='force', help="""Force script to continue and try to use all 1671 available NVMe devices during test. 1672 WARNING: Might result in data loss on used NVMe drives""") 1673 1674 args = parser.parse_args() 1675 1676 logging.basicConfig(level=logging.INFO, 1677 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1678 1679 logging.info("Using config file: %s" % args.config) 1680 with open(args.config, "r") as config: 1681 data = json.load(config) 1682 1683 initiators = [] 1684 fio_cases = [] 1685 1686 general_config = data["general"] 1687 target_config = data["target"] 1688 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1689 1690 if "null_block_devices" not in data["target"] and \ 1691 (args.force is False and 1692 "allowlist" not in data["target"] and 1693 "blocklist" not in data["target"]): 1694 # TODO: Also check if allowlist or blocklist are not empty. 1695 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1696 You can choose to use all available NVMe drives on your system, which may potentially 1697 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1698 exit(1) 1699 1700 for k, v in data.items(): 1701 if "target" in k: 1702 v.update({"results_dir": args.results}) 1703 if data[k]["mode"] == "spdk": 1704 target_obj = SPDKTarget(k, data["general"], v) 1705 elif data[k]["mode"] == "kernel": 1706 target_obj = KernelTarget(k, data["general"], v) 1707 elif "initiator" in k: 1708 if data[k]["mode"] == "spdk": 1709 init_obj = SPDKInitiator(k, data["general"], v) 1710 elif data[k]["mode"] == "kernel": 1711 init_obj = KernelInitiator(k, data["general"], v) 1712 initiators.append(init_obj) 1713 elif "fio" in k: 1714 fio_workloads = itertools.product(data[k]["bs"], 1715 data[k]["qd"], 1716 data[k]["rw"]) 1717 1718 fio_run_time = data[k]["run_time"] 1719 fio_ramp_time = data[k]["ramp_time"] 1720 fio_rw_mix_read = data[k]["rwmixread"] 1721 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1722 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1723 1724 fio_rate_iops = 0 1725 if "rate_iops" in data[k]: 1726 fio_rate_iops = data[k]["rate_iops"] 1727 1728 fio_offset = False 1729 if "offset" in data[k]: 1730 fio_offset = data[k]["offset"] 1731 fio_offset_inc = 0 1732 if "offset_inc" in data[k]: 1733 fio_offset_inc = data[k]["offset_inc"] 1734 else: 1735 continue 1736 1737 try: 1738 os.mkdir(args.results) 1739 except FileExistsError: 1740 pass 1741 1742 for i in initiators: 1743 target_obj.initiator_info.append( 1744 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1745 ) 1746 1747 # TODO: This try block is definietly too large. Need to break this up into separate 1748 # logical blocks to reduce size. 1749 try: 1750 target_obj.tgt_start() 1751 1752 for i in initiators: 1753 i.match_subsystems(target_obj.subsystem_info_list) 1754 if i.enable_adq: 1755 i.adq_configure_tc() 1756 1757 # Poor mans threading 1758 # Run FIO tests 1759 for block_size, io_depth, rw in fio_workloads: 1760 configs = [] 1761 for i in initiators: 1762 i.init_connect() 1763 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no, 1764 fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops, 1765 fio_offset, fio_offset_inc) 1766 configs.append(cfg) 1767 1768 for run_no in range(1, fio_run_num+1): 1769 threads = [] 1770 power_daemon = None 1771 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1772 1773 for i, cfg in zip(initiators, configs): 1774 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1775 threads.append(t) 1776 if target_obj.enable_sar: 1777 sar_file_prefix = measurements_prefix + "_sar" 1778 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1779 threads.append(t) 1780 1781 if target_obj.enable_pcm: 1782 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1783 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1784 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1785 threads.append(pcm_cpu_t) 1786 1787 if target_obj.enable_bw: 1788 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1789 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1790 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1791 threads.append(t) 1792 1793 if target_obj.enable_dpdk_memory: 1794 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1795 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1796 threads.append(t) 1797 1798 if target_obj.enable_pm: 1799 power_daemon = threading.Thread(target=target_obj.measure_power, 1800 args=(args.results, measurements_prefix, script_full_dir, 1801 fio_ramp_time, fio_run_time)) 1802 threads.append(power_daemon) 1803 1804 for t in threads: 1805 t.start() 1806 for t in threads: 1807 t.join() 1808 1809 for i in initiators: 1810 i.init_disconnect() 1811 i.copy_result_files(args.results) 1812 try: 1813 parse_results(args.results, args.csv_filename) 1814 except Exception as err: 1815 exit_code = 1 1816 logging.error("There was an error with parsing the results") 1817 logging.error(err) 1818 finally: 1819 for i in initiators: 1820 try: 1821 i.stop() 1822 except Exception as err: 1823 pass 1824 target_obj.stop() 1825 sys.exit(exit_code) 1826