1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False), 57 ConfigField(name='pause_frames', default=None) 58 ] 59 60 self.read_config(config_fields, general_config) 61 self.transport = self.transport.lower() 62 63 config_fields = [ 64 ConfigField(name='nic_ips', required=True), 65 ConfigField(name='mode', required=True), 66 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 67 ConfigField(name='enable_arfs', default=False), 68 ConfigField(name='tuned_profile', default='') 69 ] 70 self.read_config(config_fields, server_config) 71 72 self.local_nic_info = [] 73 self._nics_json_obj = {} 74 self.svc_restore_dict = {} 75 self.sysctl_restore_dict = {} 76 self.tuned_restore_dict = {} 77 self.governor_restore = "" 78 79 self.enable_adq = server_config.get('adq_enable', False) 80 self.adq_priority = 1 if self.enable_adq else None 81 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 82 83 if not re.match(r'^[A-Za-z0-9\-]+$', name): 84 self.log.info("Please use a name which contains only letters, numbers or dashes") 85 sys.exit(1) 86 87 def read_config(self, config_fields, config): 88 for config_field in config_fields: 89 value = config.get(config_field.name, config_field.default) 90 if config_field.required is True and value is None: 91 raise ValueError('%s config key is required' % config_field.name) 92 93 setattr(self, config_field.name, value) 94 95 @staticmethod 96 def get_uncommented_lines(lines): 97 return [line for line in lines if line and not line.startswith('#')] 98 99 def get_nic_name_by_ip(self, ip): 100 if not self._nics_json_obj: 101 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 102 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 103 for nic in self._nics_json_obj: 104 for addr in nic["addr_info"]: 105 if ip in addr["local"]: 106 return nic["ifname"] 107 108 @abstractmethod 109 def set_local_nic_info_helper(self): 110 pass 111 112 def set_local_nic_info(self, pci_info): 113 def extract_network_elements(json_obj): 114 nic_list = [] 115 if isinstance(json_obj, list): 116 for x in json_obj: 117 nic_list.extend(extract_network_elements(x)) 118 elif isinstance(json_obj, dict): 119 if "children" in json_obj: 120 nic_list.extend(extract_network_elements(json_obj["children"])) 121 if "class" in json_obj.keys() and "network" in json_obj["class"]: 122 nic_list.append(json_obj) 123 return nic_list 124 125 self.local_nic_info = extract_network_elements(pci_info) 126 127 def get_nic_numa_node(self, nic_name): 128 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 129 130 def get_numa_cpu_map(self): 131 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 132 numa_cpu_json_map = {} 133 134 for cpu in numa_cpu_json_obj["cpus"]: 135 cpu_num = int(cpu["cpu"]) 136 numa_node = int(cpu["node"]) 137 numa_cpu_json_map.setdefault(numa_node, []) 138 numa_cpu_json_map[numa_node].append(cpu_num) 139 140 return numa_cpu_json_map 141 142 # pylint: disable=R0201 143 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 144 return "" 145 146 def configure_system(self): 147 self.load_drivers() 148 self.configure_services() 149 self.configure_sysctl() 150 self.configure_arfs() 151 self.configure_tuned() 152 self.configure_cpu_governor() 153 self.configure_irq_affinity(**self.irq_settings) 154 self.configure_pause_frames() 155 156 RDMA_PROTOCOL_IWARP = 0 157 RDMA_PROTOCOL_ROCE = 1 158 RDMA_PROTOCOL_UNKNOWN = -1 159 160 def check_rdma_protocol(self): 161 try: 162 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 163 roce_ena = roce_ena.strip() 164 if roce_ena == "0": 165 return self.RDMA_PROTOCOL_IWARP 166 else: 167 return self.RDMA_PROTOCOL_ROCE 168 except CalledProcessError as e: 169 self.log.error("ERROR: failed to check RDMA protocol!") 170 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 171 return self.RDMA_PROTOCOL_UNKNOWN 172 173 def load_drivers(self): 174 self.log.info("Loading drivers") 175 self.exec_cmd(["sudo", "modprobe", "-a", 176 "nvme-%s" % self.transport, 177 "nvmet-%s" % self.transport]) 178 current_mode = self.check_rdma_protocol() 179 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 180 self.log.error("ERROR: failed to check RDMA protocol mode") 181 return 182 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 183 self.reload_driver("irdma", "roce_ena=1") 184 self.log.info("Loaded irdma driver with RoCE enabled") 185 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 186 self.log.info("Leaving irdma driver with RoCE enabled") 187 else: 188 self.reload_driver("irdma", "roce_ena=0") 189 self.log.info("Loaded irdma driver with iWARP enabled") 190 191 def set_pause_frames(self, rx_state, tx_state): 192 for nic_ip in self.nic_ips: 193 nic_name = self.get_nic_name_by_ip(nic_ip) 194 self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state]) 195 196 def configure_pause_frames(self): 197 if self.pause_frames is None: 198 self.log.info("Keeping NIC's default pause frames setting") 199 return 200 elif not self.pause_frames: 201 self.log.info("Turning off pause frames") 202 self.set_pause_frames("off", "off") 203 else: 204 self.log.info("Turning on pause frames") 205 self.set_pause_frames("on", "on") 206 207 def configure_arfs(self): 208 rps_flow_cnt = 512 209 if not self.enable_arfs: 210 rps_flow_cnt = 0 211 212 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 213 for nic_name in nic_names: 214 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 215 self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}") 216 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 217 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 218 219 for qf in queue_files: 220 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 221 222 def configure_adq(self): 223 self.adq_load_modules() 224 self.adq_configure_nic() 225 226 def adq_load_modules(self): 227 self.log.info("Modprobing ADQ-related Linux modules...") 228 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 229 for module in adq_module_deps: 230 try: 231 self.exec_cmd(["sudo", "modprobe", module]) 232 self.log.info("%s loaded!" % module) 233 except CalledProcessError as e: 234 self.log.error("ERROR: failed to load module %s" % module) 235 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 236 237 def adq_configure_tc(self): 238 self.log.info("Configuring ADQ Traffic classes and filters...") 239 240 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 241 num_queues_tc1 = self.num_cores 242 port_param = "dst_port" if isinstance(self, Target) else "src_port" 243 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 244 245 for nic_ip in self.nic_ips: 246 nic_name = self.get_nic_name_by_ip(nic_ip) 247 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 248 249 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 250 "root", "mqprio", "num_tc", "2", "map", "0", "1", 251 "queues", "%s@0" % num_queues_tc0, 252 "%s@%s" % (num_queues_tc1, num_queues_tc0), 253 "hw", "1", "mode", "channel"] 254 self.log.info(" ".join(tc_qdisc_map_cmd)) 255 self.exec_cmd(tc_qdisc_map_cmd) 256 257 time.sleep(5) 258 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 259 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 260 self.exec_cmd(tc_qdisc_ingress_cmd) 261 262 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 263 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 264 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 265 266 for port in nic_ports: 267 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 268 "protocol", "ip", "ingress", "prio", "1", "flower", 269 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 270 "skip_sw", "hw_tc", "1"] 271 self.log.info(" ".join(tc_filter_cmd)) 272 self.exec_cmd(tc_filter_cmd) 273 274 # show tc configuration 275 self.log.info("Show tc configuration for %s NIC..." % nic_name) 276 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 277 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 278 self.log.info("%s" % tc_disk_out) 279 self.log.info("%s" % tc_filter_out) 280 281 # Ethtool coalesce settings must be applied after configuring traffic classes 282 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 283 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 284 285 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 286 xps_cmd = ["sudo", xps_script_path, nic_name] 287 self.log.info(xps_cmd) 288 self.exec_cmd(xps_cmd) 289 290 def reload_driver(self, driver, *modprobe_args): 291 292 try: 293 self.exec_cmd(["sudo", "rmmod", driver]) 294 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 295 except CalledProcessError as e: 296 self.log.error("ERROR: failed to reload %s module!" % driver) 297 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 298 299 def adq_configure_nic(self): 300 self.log.info("Configuring NIC port settings for ADQ testing...") 301 302 # Reload the driver first, to make sure any previous settings are re-set. 303 self.reload_driver("ice") 304 305 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 306 for nic in nic_names: 307 self.log.info(nic) 308 try: 309 self.exec_cmd(["sudo", "ethtool", "-K", nic, 310 "hw-tc-offload", "on"]) # Enable hardware TC offload 311 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 312 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 313 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 314 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 315 # but can be used with 4k block sizes as well 316 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 317 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 318 except subprocess.CalledProcessError as e: 319 self.log.error("ERROR: failed to configure NIC port using ethtool!") 320 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 321 self.log.info("Please update your NIC driver and firmware versions and try again.") 322 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 323 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 324 325 def configure_services(self): 326 self.log.info("Configuring active services...") 327 svc_config = configparser.ConfigParser(strict=False) 328 329 # Below list is valid only for RHEL / Fedora systems and might not 330 # contain valid names for other distributions. 331 svc_target_state = { 332 "firewalld": "inactive", 333 "irqbalance": "inactive", 334 "lldpad.service": "inactive", 335 "lldpad.socket": "inactive" 336 } 337 338 for service in svc_target_state: 339 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 340 out = "\n".join(["[%s]" % service, out]) 341 svc_config.read_string(out) 342 343 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 344 continue 345 346 service_state = svc_config[service]["ActiveState"] 347 self.log.info("Current state of %s service is %s" % (service, service_state)) 348 self.svc_restore_dict.update({service: service_state}) 349 if service_state != "inactive": 350 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 351 self.exec_cmd(["sudo", "systemctl", "stop", service]) 352 353 def configure_sysctl(self): 354 self.log.info("Tuning sysctl settings...") 355 356 busy_read = 0 357 busy_poll = 0 358 if self.enable_adq and self.mode == "spdk": 359 busy_read = 1 360 busy_poll = 1 361 362 sysctl_opts = { 363 "net.core.busy_poll": busy_poll, 364 "net.core.busy_read": busy_read, 365 "net.core.somaxconn": 4096, 366 "net.core.netdev_max_backlog": 8192, 367 "net.ipv4.tcp_max_syn_backlog": 16384, 368 "net.core.rmem_max": 268435456, 369 "net.core.wmem_max": 268435456, 370 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 371 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 372 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 373 "net.ipv4.route.flush": 1, 374 "vm.overcommit_memory": 1, 375 "net.core.rps_sock_flow_entries": 0 376 } 377 378 if self.enable_adq: 379 sysctl_opts.update(self.adq_set_busy_read(1)) 380 381 if self.enable_arfs: 382 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 383 384 for opt, value in sysctl_opts.items(): 385 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 386 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 387 388 def configure_tuned(self): 389 if not self.tuned_profile: 390 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 391 return 392 393 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 394 service = "tuned" 395 tuned_config = configparser.ConfigParser(strict=False) 396 397 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 398 out = "\n".join(["[%s]" % service, out]) 399 tuned_config.read_string(out) 400 tuned_state = tuned_config[service]["ActiveState"] 401 self.svc_restore_dict.update({service: tuned_state}) 402 403 if tuned_state != "inactive": 404 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 405 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 406 407 self.tuned_restore_dict = { 408 "profile": profile, 409 "mode": profile_mode 410 } 411 412 self.exec_cmd(["sudo", "systemctl", "start", service]) 413 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 414 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 415 416 def configure_cpu_governor(self): 417 self.log.info("Setting CPU governor to performance...") 418 419 # This assumes that there is the same CPU scaling governor on each CPU 420 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 421 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 422 423 def get_core_list_from_mask(self, core_mask): 424 # Generate list of individual cores from hex core mask 425 # (e.g. '0xffff') or list containing: 426 # - individual cores (e.g. '1, 2, 3') 427 # - core ranges (e.g. '0-3') 428 # - mix of both (e.g. '0, 1-4, 9, 11-13') 429 430 core_list = [] 431 if "0x" in core_mask: 432 core_mask_int = int(core_mask, 16) 433 for i in range(core_mask_int.bit_length()): 434 if (1 << i) & core_mask_int: 435 core_list.append(i) 436 return core_list 437 else: 438 # Core list can be provided in .json config with square brackets 439 # remove them first 440 core_mask = core_mask.replace("[", "") 441 core_mask = core_mask.replace("]", "") 442 443 for i in core_mask.split(","): 444 if "-" in i: 445 start, end = i.split("-") 446 core_range = range(int(start), int(end) + 1) 447 core_list.extend(core_range) 448 else: 449 core_list.append(int(i)) 450 return core_list 451 452 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 453 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 454 455 if mode not in ["default", "bynode", "cpulist"]: 456 raise ValueError("%s irq affinity setting not supported" % mode) 457 458 if mode == "cpulist" and not cpulist: 459 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 460 461 affinity_script = "set_irq_affinity.sh" 462 if "default" not in mode: 463 affinity_script = "set_irq_affinity_cpulist.sh" 464 system_cpu_map = self.get_numa_cpu_map() 465 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 466 467 def cpu_list_to_string(cpulist): 468 return ",".join(map(lambda x: str(x), cpulist)) 469 470 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 471 for nic_name in nic_names: 472 irq_cmd = ["sudo", irq_script_path] 473 474 # Use only CPU cores matching NIC NUMA node. 475 # Remove any CPU cores if they're on exclusion list. 476 if mode == "bynode": 477 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 478 if cpulist and exclude_cpulist: 479 disallowed_cpus = self.get_core_list_from_mask(cpulist) 480 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 481 if not irq_cpus: 482 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 483 irq_cmd.append(cpu_list_to_string(irq_cpus)) 484 485 if mode == "cpulist": 486 irq_cpus = self.get_core_list_from_mask(cpulist) 487 if exclude_cpulist: 488 # Flatten system CPU list, we don't need NUMA awareness here 489 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 490 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 491 if not irq_cpus: 492 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 493 irq_cmd.append(cpu_list_to_string(irq_cpus)) 494 495 irq_cmd.append(nic_name) 496 self.log.info(irq_cmd) 497 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 498 499 def restore_services(self): 500 self.log.info("Restoring services...") 501 for service, state in self.svc_restore_dict.items(): 502 cmd = "stop" if state == "inactive" else "start" 503 self.exec_cmd(["sudo", "systemctl", cmd, service]) 504 505 def restore_sysctl(self): 506 self.log.info("Restoring sysctl settings...") 507 for opt, value in self.sysctl_restore_dict.items(): 508 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 509 510 def restore_tuned(self): 511 self.log.info("Restoring tuned-adm settings...") 512 513 if not self.tuned_restore_dict: 514 return 515 516 if self.tuned_restore_dict["mode"] == "auto": 517 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 518 self.log.info("Reverted tuned-adm to auto_profile.") 519 else: 520 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 521 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 522 523 def restore_governor(self): 524 self.log.info("Restoring CPU governor setting...") 525 if self.governor_restore: 526 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 527 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 528 529 def restore_settings(self): 530 self.restore_governor() 531 self.restore_tuned() 532 self.restore_services() 533 self.restore_sysctl() 534 if self.enable_adq: 535 self.reload_driver("ice") 536 537 538class Target(Server): 539 def __init__(self, name, general_config, target_config): 540 super().__init__(name, general_config, target_config) 541 542 # Defaults 543 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 544 self.subsystem_info_list = [] 545 self.initiator_info = [] 546 547 config_fields = [ 548 ConfigField(name='mode', required=True), 549 ConfigField(name='results_dir', required=True), 550 ConfigField(name='enable_pm', default=True), 551 ConfigField(name='enable_sar', default=True), 552 ConfigField(name='enable_pcm', default=True), 553 ConfigField(name='enable_dpdk_memory', default=True) 554 ] 555 self.read_config(config_fields, target_config) 556 557 self.null_block = target_config.get('null_block_devices', 0) 558 self.scheduler_name = target_config.get('scheduler_settings', 'static') 559 self.enable_zcopy = target_config.get('zcopy_settings', False) 560 self.enable_bw = target_config.get('enable_bandwidth', True) 561 self.nvme_blocklist = target_config.get('blocklist', []) 562 self.nvme_allowlist = target_config.get('allowlist', []) 563 564 # Blocklist takes precedence, remove common elements from allowlist 565 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 566 567 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 568 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 569 570 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 571 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 572 self.set_local_nic_info(self.set_local_nic_info_helper()) 573 574 if self.skip_spdk_install is False: 575 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 576 577 self.configure_system() 578 if self.enable_adq: 579 self.configure_adq() 580 self.configure_irq_affinity() 581 self.sys_config() 582 583 def set_local_nic_info_helper(self): 584 return json.loads(self.exec_cmd(["lshw", "-json"])) 585 586 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 587 stderr_opt = None 588 if stderr_redirect: 589 stderr_opt = subprocess.STDOUT 590 if change_dir: 591 old_cwd = os.getcwd() 592 os.chdir(change_dir) 593 self.log.info("Changing directory to %s" % change_dir) 594 595 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 596 597 if change_dir: 598 os.chdir(old_cwd) 599 self.log.info("Changing directory to %s" % old_cwd) 600 return out 601 602 def zip_spdk_sources(self, spdk_dir, dest_file): 603 self.log.info("Zipping SPDK source directory") 604 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 605 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 606 for file in files: 607 fh.write(os.path.relpath(os.path.join(root, file))) 608 fh.close() 609 self.log.info("Done zipping") 610 611 @staticmethod 612 def _chunks(input_list, chunks_no): 613 div, rem = divmod(len(input_list), chunks_no) 614 for i in range(chunks_no): 615 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 616 yield input_list[si:si + (div + 1 if i < rem else div)] 617 618 def spread_bdevs(self, req_disks): 619 # Spread available block devices indexes: 620 # - evenly across available initiator systems 621 # - evenly across available NIC interfaces for 622 # each initiator 623 # Not NUMA aware. 624 ip_bdev_map = [] 625 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 626 627 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 628 self.initiator_info[i]["bdev_range"] = init_chunk 629 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 630 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 631 for c in nic_chunk: 632 ip_bdev_map.append((ip, c)) 633 return ip_bdev_map 634 635 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 636 cpu_number = os.cpu_count() 637 sar_idle_sum = 0 638 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 639 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 640 641 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 642 time.sleep(ramp_time) 643 self.log.info("Starting SAR measurements") 644 645 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 646 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 647 for line in out.split("\n"): 648 if "Average" in line: 649 if "CPU" in line: 650 self.log.info("Summary CPU utilization from SAR:") 651 self.log.info(line) 652 elif "all" in line: 653 self.log.info(line) 654 else: 655 sar_idle_sum += float(line.split()[7]) 656 fh.write(out) 657 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 658 659 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 660 f.write("%0.2f" % sar_cpu_usage) 661 662 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 663 time.sleep(ramp_time) 664 self.log.info("Starting power measurements") 665 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 666 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 667 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 668 669 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 670 time.sleep(ramp_time) 671 cmd = ["pcm", "1", "-i=%s" % run_time, 672 "-csv=%s/%s" % (results_dir, pcm_file_name)] 673 subprocess.run(cmd) 674 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 675 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 676 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 677 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 678 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 679 680 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 681 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 682 time.sleep(ramp_time) 683 self.log.info("INFO: starting network bandwidth measure") 684 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 685 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 686 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 687 # Improve this so that additional file containing measurements average is generated too. 688 # TODO: Monitor only these interfaces which are currently used to run the workload. 689 690 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 691 self.log.info("INFO: waiting to generate DPDK memory usage") 692 time.sleep(ramp_time) 693 self.log.info("INFO: generating DPDK memory usage") 694 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 695 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 696 697 def sys_config(self): 698 self.log.info("====Kernel release:====") 699 self.log.info(os.uname().release) 700 self.log.info("====Kernel command line:====") 701 with open('/proc/cmdline') as f: 702 cmdline = f.readlines() 703 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 704 self.log.info("====sysctl conf:====") 705 with open('/etc/sysctl.conf') as f: 706 sysctl = f.readlines() 707 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 708 self.log.info("====Cpu power info:====") 709 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 710 self.log.info("====zcopy settings:====") 711 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 712 self.log.info("====Scheduler settings:====") 713 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 714 715 716class Initiator(Server): 717 def __init__(self, name, general_config, initiator_config): 718 super().__init__(name, general_config, initiator_config) 719 720 # required fields and defaults 721 config_fields = [ 722 ConfigField(name='mode', required=True), 723 ConfigField(name='ip', required=True), 724 ConfigField(name='target_nic_ips', required=True), 725 ConfigField(name='cpus_allowed', default=None), 726 ConfigField(name='cpus_allowed_policy', default='shared'), 727 ConfigField(name='spdk_dir', default='/tmp/spdk'), 728 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 729 ConfigField(name='nvmecli_bin', default='nvme'), 730 ConfigField(name='cpu_frequency', default=None), 731 ConfigField(name='allow_cpu_sharing', default=True) 732 ] 733 734 self.read_config(config_fields, initiator_config) 735 736 if os.getenv('SPDK_WORKSPACE'): 737 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 738 739 self.subsystem_info_list = [] 740 741 self.ssh_connection = paramiko.SSHClient() 742 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 743 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 744 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 745 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 746 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 747 748 if self.skip_spdk_install is False: 749 self.copy_spdk("/tmp/spdk.zip") 750 751 self.set_local_nic_info(self.set_local_nic_info_helper()) 752 self.set_cpu_frequency() 753 self.configure_system() 754 if self.enable_adq: 755 self.configure_adq() 756 self.sys_config() 757 758 def set_local_nic_info_helper(self): 759 return json.loads(self.exec_cmd(["lshw", "-json"])) 760 761 def stop(self): 762 self.restore_settings() 763 self.ssh_connection.close() 764 765 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 766 if change_dir: 767 cmd = ["cd", change_dir, ";", *cmd] 768 769 # In case one of the command elements contains whitespace and is not 770 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 771 # when sending to remote system. 772 for i, c in enumerate(cmd): 773 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 774 cmd[i] = '"%s"' % c 775 cmd = " ".join(cmd) 776 777 # Redirect stderr to stdout thanks using get_pty option if needed 778 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 779 out = stdout.read().decode(encoding="utf-8") 780 781 # Check the return code 782 rc = stdout.channel.recv_exit_status() 783 if rc: 784 raise CalledProcessError(int(rc), cmd, out) 785 786 return out 787 788 def put_file(self, local, remote_dest): 789 ftp = self.ssh_connection.open_sftp() 790 ftp.put(local, remote_dest) 791 ftp.close() 792 793 def get_file(self, remote, local_dest): 794 ftp = self.ssh_connection.open_sftp() 795 ftp.get(remote, local_dest) 796 ftp.close() 797 798 def copy_spdk(self, local_spdk_zip): 799 self.log.info("Copying SPDK sources to initiator %s" % self.name) 800 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 801 self.log.info("Copied sources zip from target") 802 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 803 self.log.info("Sources unpacked") 804 805 def copy_result_files(self, dest_dir): 806 self.log.info("Copying results") 807 808 if not os.path.exists(dest_dir): 809 os.mkdir(dest_dir) 810 811 # Get list of result files from initiator and copy them back to target 812 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 813 814 for file in file_list: 815 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 816 os.path.join(dest_dir, file)) 817 self.log.info("Done copying results") 818 819 def match_subsystems(self, target_subsystems): 820 subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips] 821 if not subsystems: 822 raise Exception("No matching subsystems found on target side!") 823 subsystems.sort(key=lambda x: x[1]) 824 self.log.info("Found matching subsystems on target side:") 825 for s in subsystems: 826 self.log.info(s) 827 self.subsystem_info_list = subsystems 828 829 @abstractmethod 830 def init_connect(self): 831 pass 832 833 @abstractmethod 834 def init_disconnect(self): 835 pass 836 837 @abstractmethod 838 def gen_fio_filename_conf(self, *args, **kwargs): 839 # Logic implemented in SPDKInitiator and KernelInitiator classes 840 pass 841 842 def get_route_nic_numa(self, remote_nvme_ip): 843 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 844 local_nvme_nic = local_nvme_nic[0]["dev"] 845 return self.get_nic_numa_node(local_nvme_nic) 846 847 @staticmethod 848 def gen_fio_offset_section(offset_inc, num_jobs): 849 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 850 return "\n".join(["size=%s%%" % offset_inc, 851 "offset=0%", 852 "offset_increment=%s%%" % offset_inc]) 853 854 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 855 numa_stats = {} 856 allowed_cpus = [] 857 for nvme in fio_filenames_list: 858 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 859 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 860 861 # Use the most common NUMA node for this chunk to allocate memory and CPUs 862 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 863 864 # Check if we have enough free CPUs to pop from the list before assigning them 865 if len(self.available_cpus[section_local_numa]) < num_jobs: 866 if self.allow_cpu_sharing: 867 self.log.info("Regenerating available CPU list %s" % section_local_numa) 868 # Remove still available CPUs from the regenerated list. We don't want to 869 # regenerate it with duplicates. 870 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 871 self.available_cpus[section_local_numa].extend(cpus_regen) 872 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 873 else: 874 self.log.error("No more free CPU cores to use from allowed_cpus list!") 875 raise IndexError 876 877 for _ in range(num_jobs): 878 try: 879 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 880 except IndexError: 881 self.log.error("No more free CPU cores to use from allowed_cpus list!") 882 raise 883 884 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 885 "numa_mem_policy=prefer:%s" % section_local_numa]) 886 887 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 888 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 889 offset=False, offset_inc=0): 890 fio_conf_template = """ 891[global] 892ioengine={ioengine} 893{spdk_conf} 894thread=1 895group_reporting=1 896direct=1 897percentile_list=50:90:99:99.5:99.9:99.99:99.999 898 899norandommap=1 900rw={rw} 901rwmixread={rwmixread} 902bs={block_size} 903time_based=1 904ramp_time={ramp_time} 905runtime={run_time} 906rate_iops={rate_iops} 907""" 908 909 if self.cpus_allowed is not None: 910 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 911 cpus_num = 0 912 cpus = self.cpus_allowed.split(",") 913 for cpu in cpus: 914 if "-" in cpu: 915 a, b = cpu.split("-") 916 a = int(a) 917 b = int(b) 918 cpus_num += len(range(a, b)) 919 else: 920 cpus_num += 1 921 self.num_cores = cpus_num 922 threads = range(0, self.num_cores) 923 elif hasattr(self, 'num_cores'): 924 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 925 threads = range(0, int(self.num_cores)) 926 else: 927 self.num_cores = len(self.subsystem_info_list) 928 threads = range(0, len(self.subsystem_info_list)) 929 930 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 931 offset, offset_inc) 932 933 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 934 rw=rw, rwmixread=rwmixread, block_size=block_size, 935 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 936 937 # TODO: hipri disabled for now, as it causes fio errors: 938 # io_u error on file /dev/nvme2n1: Operation not supported 939 # See comment in KernelInitiator class, init_connect() function 940 if "io_uring" in self.ioengine: 941 fio_config = fio_config + """ 942fixedbufs=1 943registerfiles=1 944#hipri=1 945""" 946 if num_jobs: 947 fio_config = fio_config + "numjobs=%s \n" % num_jobs 948 if self.cpus_allowed is not None: 949 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 950 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 951 fio_config = fio_config + filename_section 952 953 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 954 if hasattr(self, "num_cores"): 955 fio_config_filename += "_%sCPU" % self.num_cores 956 fio_config_filename += ".fio" 957 958 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 959 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 960 self.log.info("Created FIO Config:") 961 self.log.info(fio_config) 962 963 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 964 965 def set_cpu_frequency(self): 966 if self.cpu_frequency is not None: 967 try: 968 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 969 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 970 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 971 except Exception: 972 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 973 sys.exit(1) 974 else: 975 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 976 977 def run_fio(self, fio_config_file, run_num=1): 978 job_name, _ = os.path.splitext(fio_config_file) 979 self.log.info("Starting FIO run for job: %s" % job_name) 980 self.log.info("Using FIO: %s" % self.fio_bin) 981 982 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 983 try: 984 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 985 "--output=%s" % output_filename, "--eta=never"], True) 986 self.log.info(output) 987 self.log.info("FIO run finished. Results in: %s" % output_filename) 988 except subprocess.CalledProcessError as e: 989 self.log.error("ERROR: Fio process failed!") 990 self.log.error(e.stdout) 991 992 def sys_config(self): 993 self.log.info("====Kernel release:====") 994 self.log.info(self.exec_cmd(["uname", "-r"])) 995 self.log.info("====Kernel command line:====") 996 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 997 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 998 self.log.info("====sysctl conf:====") 999 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 1000 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1001 self.log.info("====Cpu power info:====") 1002 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 1003 1004 1005class KernelTarget(Target): 1006 def __init__(self, name, general_config, target_config): 1007 super().__init__(name, general_config, target_config) 1008 # Defaults 1009 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1010 1011 def load_drivers(self): 1012 self.log.info("Loading drivers") 1013 super().load_drivers() 1014 if self.null_block: 1015 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1016 1017 def configure_adq(self): 1018 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1019 1020 def adq_configure_tc(self): 1021 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1022 1023 def adq_set_busy_read(self, busy_read_val): 1024 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1025 return {"net.core.busy_read": 0} 1026 1027 def stop(self): 1028 self.nvmet_command(self.nvmet_bin, "clear") 1029 self.restore_settings() 1030 1031 def get_nvme_device_bdf(self, nvme_dev_path): 1032 nvme_name = os.path.basename(nvme_dev_path) 1033 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1034 1035 def get_nvme_devices(self): 1036 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1037 nvme_list = [] 1038 for dev in dev_list: 1039 if "nvme" not in dev: 1040 continue 1041 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1042 continue 1043 if len(self.nvme_allowlist) == 0: 1044 nvme_list.append(dev) 1045 continue 1046 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1047 nvme_list.append(dev) 1048 return nvme_list 1049 1050 def nvmet_command(self, nvmet_bin, command): 1051 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1052 1053 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1054 1055 nvmet_cfg = { 1056 "ports": [], 1057 "hosts": [], 1058 "subsystems": [], 1059 } 1060 1061 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1062 port = str(4420 + bdev_num) 1063 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1064 serial = "SPDK00%s" % bdev_num 1065 bdev_name = nvme_list[bdev_num] 1066 1067 nvmet_cfg["subsystems"].append({ 1068 "allowed_hosts": [], 1069 "attr": { 1070 "allow_any_host": "1", 1071 "serial": serial, 1072 "version": "1.3" 1073 }, 1074 "namespaces": [ 1075 { 1076 "device": { 1077 "path": bdev_name, 1078 "uuid": "%s" % uuid.uuid4() 1079 }, 1080 "enable": 1, 1081 "nsid": port 1082 } 1083 ], 1084 "nqn": nqn 1085 }) 1086 1087 nvmet_cfg["ports"].append({ 1088 "addr": { 1089 "adrfam": "ipv4", 1090 "traddr": ip, 1091 "trsvcid": port, 1092 "trtype": self.transport 1093 }, 1094 "portid": bdev_num, 1095 "referrals": [], 1096 "subsystems": [nqn] 1097 }) 1098 1099 self.subsystem_info_list.append((port, nqn, ip)) 1100 self.subsys_no = len(self.subsystem_info_list) 1101 1102 with open("kernel.conf", "w") as fh: 1103 fh.write(json.dumps(nvmet_cfg, indent=2)) 1104 1105 def tgt_start(self): 1106 self.log.info("Configuring kernel NVMeOF Target") 1107 1108 if self.null_block: 1109 self.log.info("Configuring with null block device.") 1110 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1111 else: 1112 self.log.info("Configuring with NVMe drives.") 1113 nvme_list = self.get_nvme_devices() 1114 1115 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1116 self.subsys_no = len(nvme_list) 1117 1118 self.nvmet_command(self.nvmet_bin, "clear") 1119 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1120 1121 if self.enable_adq: 1122 self.adq_configure_tc() 1123 1124 self.log.info("Done configuring kernel NVMeOF Target") 1125 1126 1127class SPDKTarget(Target): 1128 def __init__(self, name, general_config, target_config): 1129 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1130 # Method setting IRQ affinity is run as part of parent classes init, 1131 # so we need to have self.core_mask set before changing IRQ affinity. 1132 self.core_mask = target_config["core_mask"] 1133 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1134 1135 super().__init__(name, general_config, target_config) 1136 1137 # Format: property, default value 1138 config_fields = [ 1139 ConfigField(name='dif_insert_strip', default=False), 1140 ConfigField(name='null_block_dif_type', default=0), 1141 ConfigField(name='num_shared_buffers', default=4096), 1142 ConfigField(name='max_queue_depth', default=128), 1143 ConfigField(name='bpf_scripts', default=[]), 1144 ConfigField(name='scheduler_core_limit', default=None), 1145 ConfigField(name='dsa_settings', default=False), 1146 ConfigField(name='iobuf_small_pool_count', default=32767), 1147 ConfigField(name='iobuf_large_pool_count', default=16383), 1148 ConfigField(name='num_cqe', default=4096), 1149 ConfigField(name='sock_impl', default='posix') 1150 ] 1151 1152 self.read_config(config_fields, target_config) 1153 1154 self.bpf_proc = None 1155 self.enable_dsa = False 1156 1157 self.log.info("====DSA settings:====") 1158 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1159 1160 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1161 if mode not in ["default", "bynode", "cpulist", 1162 "shared", "split", "split-bynode"]: 1163 self.log.error("%s irq affinity setting not supported" % mode) 1164 raise Exception 1165 1166 # Create core list from SPDK's mask and change it to string. 1167 # This is the type configure_irq_affinity expects for cpulist parameter. 1168 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1169 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1170 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1171 1172 if mode == "shared": 1173 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1174 elif mode == "split": 1175 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1176 elif mode == "split-bynode": 1177 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1178 else: 1179 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1180 1181 def adq_set_busy_read(self, busy_read_val): 1182 return {"net.core.busy_read": busy_read_val} 1183 1184 def get_nvme_devices_count(self): 1185 return len(self.get_nvme_devices()) 1186 1187 def get_nvme_devices(self): 1188 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1189 bdev_bdfs = [] 1190 for bdev in bdev_subsys_json_obj["config"]: 1191 bdev_traddr = bdev["params"]["traddr"] 1192 if bdev_traddr in self.nvme_blocklist: 1193 continue 1194 if len(self.nvme_allowlist) == 0: 1195 bdev_bdfs.append(bdev_traddr) 1196 if bdev_traddr in self.nvme_allowlist: 1197 bdev_bdfs.append(bdev_traddr) 1198 return bdev_bdfs 1199 1200 def spdk_tgt_configure(self): 1201 self.log.info("Configuring SPDK NVMeOF target via RPC") 1202 1203 # Create transport layer 1204 nvmf_transport_params = { 1205 "client": self.client, 1206 "trtype": self.transport, 1207 "num_shared_buffers": self.num_shared_buffers, 1208 "max_queue_depth": self.max_queue_depth, 1209 "dif_insert_or_strip": self.dif_insert_strip, 1210 "sock_priority": self.adq_priority, 1211 "num_cqe": self.num_cqe 1212 } 1213 1214 if self.enable_adq: 1215 nvmf_transport_params["acceptor_poll_rate"] = 10000 1216 1217 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1218 self.log.info("SPDK NVMeOF transport layer:") 1219 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1220 1221 if self.null_block: 1222 self.spdk_tgt_add_nullblock(self.null_block) 1223 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1224 else: 1225 self.spdk_tgt_add_nvme_conf() 1226 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1227 1228 if self.enable_adq: 1229 self.adq_configure_tc() 1230 1231 self.log.info("Done configuring SPDK NVMeOF Target") 1232 1233 def spdk_tgt_add_nullblock(self, null_block_count): 1234 md_size = 0 1235 block_size = 4096 1236 if self.null_block_dif_type != 0: 1237 md_size = 128 1238 1239 self.log.info("Adding null block bdevices to config via RPC") 1240 for i in range(null_block_count): 1241 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1242 rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i), 1243 dif_type=self.null_block_dif_type, md_size=md_size) 1244 self.log.info("SPDK Bdevs configuration:") 1245 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1246 1247 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1248 self.log.info("Adding NVMe bdevs to config via RPC") 1249 1250 bdfs = self.get_nvme_devices() 1251 bdfs = [b.replace(":", ".") for b in bdfs] 1252 1253 if req_num_disks: 1254 if req_num_disks > len(bdfs): 1255 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1256 sys.exit(1) 1257 else: 1258 bdfs = bdfs[0:req_num_disks] 1259 1260 for i, bdf in enumerate(bdfs): 1261 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1262 1263 self.log.info("SPDK Bdevs configuration:") 1264 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1265 1266 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1267 self.log.info("Adding subsystems to config") 1268 if not req_num_disks: 1269 req_num_disks = self.get_nvme_devices_count() 1270 1271 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1272 port = str(4420 + bdev_num) 1273 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1274 serial = "SPDK00%s" % bdev_num 1275 bdev_name = "Nvme%sn1" % bdev_num 1276 1277 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1278 allow_any_host=True, max_namespaces=8) 1279 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1280 for nqn_name in [nqn, "discovery"]: 1281 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1282 nqn=nqn_name, 1283 trtype=self.transport, 1284 traddr=ip, 1285 trsvcid=port, 1286 adrfam="ipv4") 1287 self.subsystem_info_list.append((port, nqn, ip)) 1288 self.subsys_no = len(self.subsystem_info_list) 1289 1290 self.log.info("SPDK NVMeOF subsystem configuration:") 1291 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1292 1293 def bpf_start(self): 1294 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1295 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1296 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1297 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1298 1299 with open(self.pid, "r") as fh: 1300 nvmf_pid = str(fh.readline()) 1301 1302 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1303 self.log.info(cmd) 1304 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1305 1306 def tgt_start(self): 1307 if self.null_block: 1308 self.subsys_no = 1 1309 else: 1310 self.subsys_no = self.get_nvme_devices_count() 1311 self.log.info("Starting SPDK NVMeOF Target process") 1312 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1313 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1314 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1315 1316 with open(self.pid, "w") as fh: 1317 fh.write(str(proc.pid)) 1318 self.nvmf_proc = proc 1319 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1320 self.log.info("Waiting for spdk to initialize...") 1321 while True: 1322 if os.path.exists("/var/tmp/spdk.sock"): 1323 break 1324 time.sleep(1) 1325 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1326 1327 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1328 rpc.iobuf.iobuf_set_options(self.client, 1329 small_pool_count=self.iobuf_small_pool_count, 1330 large_pool_count=self.iobuf_large_pool_count, 1331 small_bufsize=None, 1332 large_bufsize=None) 1333 1334 if self.enable_zcopy: 1335 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1336 enable_zerocopy_send_server=True) 1337 self.log.info("Target socket options:") 1338 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1339 1340 if self.enable_adq: 1341 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1342 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1343 nvme_adminq_poll_period_us=100000, retry_count=4) 1344 1345 if self.enable_dsa: 1346 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1347 self.log.info("Target DSA accel module enabled") 1348 1349 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1350 rpc.framework_start_init(self.client) 1351 1352 if self.bpf_scripts: 1353 self.bpf_start() 1354 1355 self.spdk_tgt_configure() 1356 1357 def stop(self): 1358 if self.bpf_proc: 1359 self.log.info("Stopping BPF Trace script") 1360 self.bpf_proc.terminate() 1361 self.bpf_proc.wait() 1362 1363 if hasattr(self, "nvmf_proc"): 1364 try: 1365 self.nvmf_proc.terminate() 1366 self.nvmf_proc.wait(timeout=30) 1367 except Exception as e: 1368 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1369 self.log.info(e) 1370 self.nvmf_proc.kill() 1371 self.nvmf_proc.communicate() 1372 # Try to clean up RPC socket files if they were not removed 1373 # because of using 'kill' 1374 try: 1375 os.remove("/var/tmp/spdk.sock") 1376 os.remove("/var/tmp/spdk.sock.lock") 1377 except FileNotFoundError: 1378 pass 1379 self.restore_settings() 1380 1381 1382class KernelInitiator(Initiator): 1383 def __init__(self, name, general_config, initiator_config): 1384 super().__init__(name, general_config, initiator_config) 1385 1386 # Defaults 1387 self.extra_params = initiator_config.get('extra_params', '') 1388 1389 self.ioengine = "libaio" 1390 self.spdk_conf = "" 1391 1392 if "num_cores" in initiator_config: 1393 self.num_cores = initiator_config["num_cores"] 1394 1395 if "kernel_engine" in initiator_config: 1396 self.ioengine = initiator_config["kernel_engine"] 1397 if "io_uring" in self.ioengine: 1398 self.extra_params += ' --nr-poll-queues=8' 1399 1400 def configure_adq(self): 1401 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1402 1403 def adq_configure_tc(self): 1404 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1405 1406 def adq_set_busy_read(self, busy_read_val): 1407 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1408 return {"net.core.busy_read": 0} 1409 1410 def get_connected_nvme_list(self): 1411 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1412 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1413 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1414 return nvme_list 1415 1416 def init_connect(self): 1417 self.log.info("Below connection attempts may result in error messages, this is expected!") 1418 for subsystem in self.subsystem_info_list: 1419 self.log.info("Trying to connect %s %s %s" % subsystem) 1420 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1421 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1422 time.sleep(2) 1423 1424 if "io_uring" in self.ioengine: 1425 self.log.info("Setting block layer settings for io_uring.") 1426 1427 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1428 # apparently it's not possible for connected subsystems. 1429 # Results in "error: Invalid argument" 1430 block_sysfs_settings = { 1431 "iostats": "0", 1432 "rq_affinity": "0", 1433 "nomerges": "2" 1434 } 1435 1436 for disk in self.get_connected_nvme_list(): 1437 sysfs = os.path.join("/sys/block", disk, "queue") 1438 for k, v in block_sysfs_settings.items(): 1439 sysfs_opt_path = os.path.join(sysfs, k) 1440 try: 1441 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1442 except CalledProcessError as e: 1443 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1444 finally: 1445 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1446 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1447 1448 def init_disconnect(self): 1449 for subsystem in self.subsystem_info_list: 1450 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1451 time.sleep(1) 1452 1453 def get_nvme_subsystem_numa(self, dev_name): 1454 # Remove two last characters to get controller name instead of subsystem name 1455 nvme_ctrl = os.path.basename(dev_name)[:-2] 1456 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1457 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1458 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1459 1460 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1461 self.available_cpus = self.get_numa_cpu_map() 1462 if len(threads) >= len(subsystems): 1463 threads = range(0, len(subsystems)) 1464 1465 # Generate connected nvme devices names and sort them by used NIC numa node 1466 # to allow better grouping when splitting into fio sections. 1467 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1468 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1469 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1470 1471 filename_section = "" 1472 nvme_per_split = int(len(nvme_list) / len(threads)) 1473 remainder = len(nvme_list) % len(threads) 1474 iterator = iter(nvme_list) 1475 result = [] 1476 for i in range(len(threads)): 1477 result.append([]) 1478 for _ in range(nvme_per_split): 1479 result[i].append(next(iterator)) 1480 if remainder: 1481 result[i].append(next(iterator)) 1482 remainder -= 1 1483 for i, r in enumerate(result): 1484 header = "[filename%s]" % i 1485 disks = "\n".join(["filename=%s" % x for x in r]) 1486 job_section_qd = round((io_depth * len(r)) / num_jobs) 1487 if job_section_qd == 0: 1488 job_section_qd = 1 1489 iodepth = "iodepth=%s" % job_section_qd 1490 1491 offset_section = "" 1492 if offset: 1493 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1494 1495 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1496 1497 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1498 1499 return filename_section 1500 1501 1502class SPDKInitiator(Initiator): 1503 def __init__(self, name, general_config, initiator_config): 1504 super().__init__(name, general_config, initiator_config) 1505 1506 if self.skip_spdk_install is False: 1507 self.install_spdk() 1508 1509 # Optional fields 1510 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1511 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1512 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1513 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1514 1515 if "num_cores" in initiator_config: 1516 self.num_cores = initiator_config["num_cores"] 1517 1518 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1519 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1520 1521 def adq_set_busy_read(self, busy_read_val): 1522 return {"net.core.busy_read": busy_read_val} 1523 1524 def install_spdk(self): 1525 self.log.info("Using fio binary %s" % self.fio_bin) 1526 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1527 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1528 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1529 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1530 "--enable-lto", "--disable-unit-tests"]) 1531 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1532 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1533 1534 self.log.info("SPDK built") 1535 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1536 1537 def init_connect(self): 1538 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1539 # bdev plugin initiates connection just before starting IO traffic. 1540 # This is just to have a "init_connect" equivalent of the same function 1541 # from KernelInitiator class. 1542 # Just prepare bdev.conf JSON file for later use and consider it 1543 # "making a connection". 1544 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1545 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1546 1547 def init_disconnect(self): 1548 # SPDK Initiator does not need to explicity disconnect as this gets done 1549 # after fio bdev plugin finishes IO. 1550 return 1551 1552 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1553 spdk_cfg_section = { 1554 "subsystems": [ 1555 { 1556 "subsystem": "bdev", 1557 "config": [] 1558 }, 1559 { 1560 "subsystem": "iobuf", 1561 "config": [ 1562 { 1563 "method": "iobuf_set_options", 1564 "params": { 1565 "small_pool_count": self.small_pool_count, 1566 "large_pool_count": self.large_pool_count 1567 } 1568 } 1569 ] 1570 }, 1571 { 1572 "subsystem": "sock", 1573 "config": [ 1574 { 1575 "method": "sock_set_default_impl", 1576 "params": { 1577 "impl_name": self.sock_impl 1578 } 1579 } 1580 ] 1581 } 1582 ] 1583 } 1584 1585 for i, subsys in enumerate(remote_subsystem_list): 1586 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1587 nvme_ctrl = { 1588 "method": "bdev_nvme_attach_controller", 1589 "params": { 1590 "name": "Nvme{}".format(i), 1591 "trtype": self.transport, 1592 "traddr": sub_addr, 1593 "trsvcid": sub_port, 1594 "subnqn": sub_nqn, 1595 "adrfam": "IPv4" 1596 } 1597 } 1598 1599 if self.enable_adq: 1600 nvme_ctrl["params"].update({"priority": "1"}) 1601 1602 if self.enable_data_digest: 1603 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1604 1605 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1606 1607 return json.dumps(spdk_cfg_section, indent=2) 1608 1609 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1610 self.available_cpus = self.get_numa_cpu_map() 1611 filename_section = "" 1612 if len(threads) >= len(subsystems): 1613 threads = range(0, len(subsystems)) 1614 1615 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1616 # to allow better grouping when splitting into fio sections. 1617 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1618 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1619 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1620 1621 nvme_per_split = int(len(subsystems) / len(threads)) 1622 remainder = len(subsystems) % len(threads) 1623 iterator = iter(filenames) 1624 result = [] 1625 for i in range(len(threads)): 1626 result.append([]) 1627 for _ in range(nvme_per_split): 1628 result[i].append(next(iterator)) 1629 if remainder: 1630 result[i].append(next(iterator)) 1631 remainder -= 1 1632 for i, r in enumerate(result): 1633 header = "[filename%s]" % i 1634 disks = "\n".join(["filename=%s" % x for x in r]) 1635 job_section_qd = round((io_depth * len(r)) / num_jobs) 1636 if job_section_qd == 0: 1637 job_section_qd = 1 1638 iodepth = "iodepth=%s" % job_section_qd 1639 1640 offset_section = "" 1641 if offset: 1642 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1643 1644 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1645 1646 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1647 1648 return filename_section 1649 1650 def get_nvme_subsystem_numa(self, bdev_name): 1651 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1652 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1653 1654 # Remove two last characters to get controller name instead of subsystem name 1655 nvme_ctrl = bdev_name[:-2] 1656 for bdev in bdev_conf_json_obj: 1657 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1658 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1659 return None 1660 1661 1662def initiators_match_subsystems(initiators, target_obj): 1663 for i in initiators: 1664 i.match_subsystems(target_obj.subsystem_info_list) 1665 if i.enable_adq: 1666 i.adq_configure_tc() 1667 1668 1669def run_single_fio_test(args, 1670 initiators, 1671 configs, 1672 target_obj, 1673 block_size, 1674 io_depth, 1675 rw, 1676 fio_rw_mix_read, 1677 fio_run_num, 1678 fio_ramp_time, 1679 fio_run_time): 1680 1681 for run_no in range(1, fio_run_num+1): 1682 threads = [] 1683 power_daemon = None 1684 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1685 1686 for i, cfg in zip(initiators, configs): 1687 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1688 threads.append(t) 1689 if target_obj.enable_sar: 1690 sar_file_prefix = measurements_prefix + "_sar" 1691 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1692 threads.append(t) 1693 1694 if target_obj.enable_pcm: 1695 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1696 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1697 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1698 threads.append(pcm_cpu_t) 1699 1700 if target_obj.enable_bw: 1701 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1702 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1703 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1704 threads.append(t) 1705 1706 if target_obj.enable_dpdk_memory: 1707 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1708 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1709 threads.append(t) 1710 1711 if target_obj.enable_pm: 1712 power_daemon = threading.Thread(target=target_obj.measure_power, 1713 args=(args.results, measurements_prefix, script_full_dir, 1714 fio_ramp_time, fio_run_time)) 1715 threads.append(power_daemon) 1716 1717 for t in threads: 1718 t.start() 1719 for t in threads: 1720 t.join() 1721 1722 1723def run_fio_tests(args, initiators, target_obj, 1724 fio_workloads, fio_rw_mix_read, 1725 fio_run_num, fio_ramp_time, fio_run_time, 1726 fio_rate_iops, fio_offset, fio_offset_inc): 1727 1728 for block_size, io_depth, rw in fio_workloads: 1729 configs = [] 1730 for i in initiators: 1731 i.init_connect() 1732 cfg = i.gen_fio_config(rw, 1733 fio_rw_mix_read, 1734 block_size, 1735 io_depth, 1736 target_obj.subsys_no, 1737 fio_num_jobs, 1738 fio_ramp_time, 1739 fio_run_time, 1740 fio_rate_iops, 1741 fio_offset, 1742 fio_offset_inc) 1743 configs.append(cfg) 1744 1745 run_single_fio_test(args, 1746 initiators, 1747 configs, 1748 target_obj, 1749 block_size, 1750 io_depth, 1751 rw, 1752 fio_rw_mix_read, 1753 fio_run_num, 1754 fio_ramp_time, 1755 fio_run_time) 1756 1757 for i in initiators: 1758 i.init_disconnect() 1759 i.copy_result_files(args.results) 1760 1761 try: 1762 parse_results(args.results, args.csv_filename) 1763 except Exception as err: 1764 logging.error("There was an error with parsing the results") 1765 raise err 1766 1767 1768if __name__ == "__main__": 1769 exit_code = 0 1770 1771 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1772 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1773 1774 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1775 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1776 help='Configuration file.') 1777 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1778 help='Results directory.') 1779 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1780 help='CSV results filename.') 1781 parser.add_argument('-f', '--force', default=False, action='store_true', 1782 dest='force', help="""Force script to continue and try to use all 1783 available NVMe devices during test. 1784 WARNING: Might result in data loss on used NVMe drives""") 1785 1786 args = parser.parse_args() 1787 1788 logging.basicConfig(level=logging.INFO, 1789 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1790 1791 logging.info("Using config file: %s" % args.config) 1792 with open(args.config, "r") as config: 1793 data = json.load(config) 1794 1795 initiators = [] 1796 general_config = data["general"] 1797 target_config = data["target"] 1798 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1799 1800 if "null_block_devices" not in data["target"] and \ 1801 (args.force is False and 1802 "allowlist" not in data["target"] and 1803 "blocklist" not in data["target"]): 1804 # TODO: Also check if allowlist or blocklist are not empty. 1805 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1806 You can choose to use all available NVMe drives on your system, which may potentially 1807 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1808 exit(1) 1809 1810 for k, v in data.items(): 1811 if "target" in k: 1812 v.update({"results_dir": args.results}) 1813 if data[k]["mode"] == "spdk": 1814 target_obj = SPDKTarget(k, data["general"], v) 1815 elif data[k]["mode"] == "kernel": 1816 target_obj = KernelTarget(k, data["general"], v) 1817 elif "initiator" in k: 1818 if data[k]["mode"] == "spdk": 1819 init_obj = SPDKInitiator(k, data["general"], v) 1820 elif data[k]["mode"] == "kernel": 1821 init_obj = KernelInitiator(k, data["general"], v) 1822 initiators.append(init_obj) 1823 elif "fio" in k: 1824 fio_workloads = itertools.product(data[k]["bs"], 1825 data[k]["qd"], 1826 data[k]["rw"]) 1827 1828 fio_run_time = data[k]["run_time"] 1829 fio_ramp_time = data[k]["ramp_time"] 1830 fio_rw_mix_read = data[k]["rwmixread"] 1831 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1832 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1833 1834 fio_rate_iops = 0 1835 if "rate_iops" in data[k]: 1836 fio_rate_iops = data[k]["rate_iops"] 1837 1838 fio_offset = False 1839 if "offset" in data[k]: 1840 fio_offset = data[k]["offset"] 1841 fio_offset_inc = 0 1842 if "offset_inc" in data[k]: 1843 fio_offset_inc = data[k]["offset_inc"] 1844 else: 1845 continue 1846 1847 try: 1848 os.mkdir(args.results) 1849 except FileExistsError: 1850 pass 1851 1852 for i in initiators: 1853 target_obj.initiator_info.append( 1854 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1855 ) 1856 1857 try: 1858 target_obj.tgt_start() 1859 initiators_match_subsystems(initiators, target_obj) 1860 1861 # Poor mans threading 1862 # Run FIO tests 1863 run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read, 1864 fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops, 1865 fio_offset, fio_offset_inc) 1866 1867 except Exception as e: 1868 logging.error("Exception occurred while running FIO tests") 1869 logging.error(e) 1870 exit_code = 1 1871 1872 finally: 1873 for i in initiators: 1874 try: 1875 i.stop() 1876 except Exception as err: 1877 pass 1878 target_obj.stop() 1879 sys.exit(exit_code) 1880