1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False), 57 ConfigField(name='pause_frames', default=False) 58 ] 59 self.read_config(config_fields, general_config) 60 self.transport = self.transport.lower() 61 62 config_fields = [ 63 ConfigField(name='nic_ips', required=True), 64 ConfigField(name='mode', required=True), 65 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 66 ConfigField(name='enable_arfs', default=False), 67 ConfigField(name='tuned_profile', default='') 68 ] 69 self.read_config(config_fields, server_config) 70 71 self.local_nic_info = [] 72 self._nics_json_obj = {} 73 self.svc_restore_dict = {} 74 self.sysctl_restore_dict = {} 75 self.tuned_restore_dict = {} 76 self.governor_restore = "" 77 78 self.enable_adq = server_config.get('adq_enable', False) 79 self.adq_priority = 1 if self.enable_adq else None 80 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 81 82 if not re.match(r'^[A-Za-z0-9\-]+$', name): 83 self.log.info("Please use a name which contains only letters, numbers or dashes") 84 sys.exit(1) 85 86 def read_config(self, config_fields, config): 87 for config_field in config_fields: 88 value = config.get(config_field.name, config_field.default) 89 if config_field.required is True and value is None: 90 raise ValueError('%s config key is required' % config_field.name) 91 92 setattr(self, config_field.name, value) 93 94 @staticmethod 95 def get_uncommented_lines(lines): 96 return [line for line in lines if line and not line.startswith('#')] 97 98 def get_nic_name_by_ip(self, ip): 99 if not self._nics_json_obj: 100 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 101 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 102 for nic in self._nics_json_obj: 103 for addr in nic["addr_info"]: 104 if ip in addr["local"]: 105 return nic["ifname"] 106 107 @abstractmethod 108 def set_local_nic_info_helper(self): 109 pass 110 111 def set_local_nic_info(self, pci_info): 112 def extract_network_elements(json_obj): 113 nic_list = [] 114 if isinstance(json_obj, list): 115 for x in json_obj: 116 nic_list.extend(extract_network_elements(x)) 117 elif isinstance(json_obj, dict): 118 if "children" in json_obj: 119 nic_list.extend(extract_network_elements(json_obj["children"])) 120 if "class" in json_obj.keys() and "network" in json_obj["class"]: 121 nic_list.append(json_obj) 122 return nic_list 123 124 self.local_nic_info = extract_network_elements(pci_info) 125 126 def get_nic_numa_node(self, nic_name): 127 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 128 129 def get_numa_cpu_map(self): 130 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 131 numa_cpu_json_map = {} 132 133 for cpu in numa_cpu_json_obj["cpus"]: 134 cpu_num = int(cpu["cpu"]) 135 numa_node = int(cpu["node"]) 136 numa_cpu_json_map.setdefault(numa_node, []) 137 numa_cpu_json_map[numa_node].append(cpu_num) 138 139 return numa_cpu_json_map 140 141 # pylint: disable=R0201 142 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 143 return "" 144 145 def configure_system(self): 146 self.load_drivers() 147 self.configure_services() 148 self.configure_sysctl() 149 self.configure_arfs() 150 self.configure_tuned() 151 self.configure_cpu_governor() 152 self.configure_irq_affinity(**self.irq_settings) 153 self.configure_pause_frames() 154 155 RDMA_PROTOCOL_IWARP = 0 156 RDMA_PROTOCOL_ROCE = 1 157 RDMA_PROTOCOL_UNKNOWN = -1 158 159 def check_rdma_protocol(self): 160 try: 161 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 162 roce_ena = roce_ena.strip() 163 if roce_ena == "0": 164 return self.RDMA_PROTOCOL_IWARP 165 else: 166 return self.RDMA_PROTOCOL_ROCE 167 except CalledProcessError as e: 168 self.log.error("ERROR: failed to check RDMA protocol!") 169 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 170 return self.RDMA_PROTOCOL_UNKNOWN 171 172 def load_drivers(self): 173 self.log.info("Loading drivers") 174 self.exec_cmd(["sudo", "modprobe", "-a", 175 "nvme-%s" % self.transport, 176 "nvmet-%s" % self.transport]) 177 current_mode = self.check_rdma_protocol() 178 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 179 self.log.error("ERROR: failed to check RDMA protocol mode") 180 return 181 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 182 self.reload_driver("irdma", "roce_ena=1") 183 self.log.info("Loaded irdma driver with RoCE enabled") 184 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 185 self.log.info("Leaving irdma driver with RoCE enabled") 186 else: 187 self.reload_driver("irdma", "roce_ena=0") 188 self.log.info("Loaded irdma driver with iWARP enabled") 189 190 def set_pause_frames(self, rx_state, tx_state): 191 for nic_ip in self.nic_ips: 192 nic_name = self.get_nic_name_by_ip(nic_ip) 193 self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state]) 194 195 def configure_pause_frames(self): 196 if not self.pause_frames: 197 self.log.info("Turning off pause frames") 198 self.set_pause_frames("off", "off") 199 return 200 self.log.info("Configuring pause frames") 201 self.set_pause_frames("on", "on") 202 203 def configure_arfs(self): 204 rps_flow_cnt = 512 205 if not self.enable_arfs: 206 rps_flow_cnt = 0 207 208 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 209 for nic_name in nic_names: 210 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 211 self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}") 212 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 213 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 214 215 for qf in queue_files: 216 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 217 218 def configure_adq(self): 219 self.adq_load_modules() 220 self.adq_configure_nic() 221 222 def adq_load_modules(self): 223 self.log.info("Modprobing ADQ-related Linux modules...") 224 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 225 for module in adq_module_deps: 226 try: 227 self.exec_cmd(["sudo", "modprobe", module]) 228 self.log.info("%s loaded!" % module) 229 except CalledProcessError as e: 230 self.log.error("ERROR: failed to load module %s" % module) 231 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 232 233 def adq_configure_tc(self): 234 self.log.info("Configuring ADQ Traffic classes and filters...") 235 236 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 237 num_queues_tc1 = self.num_cores 238 port_param = "dst_port" if isinstance(self, Target) else "src_port" 239 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 240 241 for nic_ip in self.nic_ips: 242 nic_name = self.get_nic_name_by_ip(nic_ip) 243 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 244 245 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 246 "root", "mqprio", "num_tc", "2", "map", "0", "1", 247 "queues", "%s@0" % num_queues_tc0, 248 "%s@%s" % (num_queues_tc1, num_queues_tc0), 249 "hw", "1", "mode", "channel"] 250 self.log.info(" ".join(tc_qdisc_map_cmd)) 251 self.exec_cmd(tc_qdisc_map_cmd) 252 253 time.sleep(5) 254 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 255 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 256 self.exec_cmd(tc_qdisc_ingress_cmd) 257 258 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 259 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 260 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 261 262 for port in nic_ports: 263 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 264 "protocol", "ip", "ingress", "prio", "1", "flower", 265 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 266 "skip_sw", "hw_tc", "1"] 267 self.log.info(" ".join(tc_filter_cmd)) 268 self.exec_cmd(tc_filter_cmd) 269 270 # show tc configuration 271 self.log.info("Show tc configuration for %s NIC..." % nic_name) 272 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 273 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 274 self.log.info("%s" % tc_disk_out) 275 self.log.info("%s" % tc_filter_out) 276 277 # Ethtool coalesce settings must be applied after configuring traffic classes 278 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 279 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 280 281 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 282 xps_cmd = ["sudo", xps_script_path, nic_name] 283 self.log.info(xps_cmd) 284 self.exec_cmd(xps_cmd) 285 286 def reload_driver(self, driver, *modprobe_args): 287 288 try: 289 self.exec_cmd(["sudo", "rmmod", driver]) 290 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 291 except CalledProcessError as e: 292 self.log.error("ERROR: failed to reload %s module!" % driver) 293 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 294 295 def adq_configure_nic(self): 296 self.log.info("Configuring NIC port settings for ADQ testing...") 297 298 # Reload the driver first, to make sure any previous settings are re-set. 299 self.reload_driver("ice") 300 301 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 302 for nic in nic_names: 303 self.log.info(nic) 304 try: 305 self.exec_cmd(["sudo", "ethtool", "-K", nic, 306 "hw-tc-offload", "on"]) # Enable hardware TC offload 307 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 308 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 309 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 310 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 311 # but can be used with 4k block sizes as well 312 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 313 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 314 except subprocess.CalledProcessError as e: 315 self.log.error("ERROR: failed to configure NIC port using ethtool!") 316 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 317 self.log.info("Please update your NIC driver and firmware versions and try again.") 318 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 319 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 320 321 def configure_services(self): 322 self.log.info("Configuring active services...") 323 svc_config = configparser.ConfigParser(strict=False) 324 325 # Below list is valid only for RHEL / Fedora systems and might not 326 # contain valid names for other distributions. 327 svc_target_state = { 328 "firewalld": "inactive", 329 "irqbalance": "inactive", 330 "lldpad.service": "inactive", 331 "lldpad.socket": "inactive" 332 } 333 334 for service in svc_target_state: 335 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 336 out = "\n".join(["[%s]" % service, out]) 337 svc_config.read_string(out) 338 339 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 340 continue 341 342 service_state = svc_config[service]["ActiveState"] 343 self.log.info("Current state of %s service is %s" % (service, service_state)) 344 self.svc_restore_dict.update({service: service_state}) 345 if service_state != "inactive": 346 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 347 self.exec_cmd(["sudo", "systemctl", "stop", service]) 348 349 def configure_sysctl(self): 350 self.log.info("Tuning sysctl settings...") 351 352 busy_read = 0 353 busy_poll = 0 354 if self.enable_adq and self.mode == "spdk": 355 busy_read = 1 356 busy_poll = 1 357 358 sysctl_opts = { 359 "net.core.busy_poll": busy_poll, 360 "net.core.busy_read": busy_read, 361 "net.core.somaxconn": 4096, 362 "net.core.netdev_max_backlog": 8192, 363 "net.ipv4.tcp_max_syn_backlog": 16384, 364 "net.core.rmem_max": 268435456, 365 "net.core.wmem_max": 268435456, 366 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 367 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 368 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 369 "net.ipv4.route.flush": 1, 370 "vm.overcommit_memory": 1, 371 "net.core.rps_sock_flow_entries": 0 372 } 373 374 if self.enable_adq: 375 sysctl_opts.update(self.adq_set_busy_read(1)) 376 377 if self.enable_arfs: 378 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 379 380 for opt, value in sysctl_opts.items(): 381 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 382 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 383 384 def configure_tuned(self): 385 if not self.tuned_profile: 386 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 387 return 388 389 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 390 service = "tuned" 391 tuned_config = configparser.ConfigParser(strict=False) 392 393 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 394 out = "\n".join(["[%s]" % service, out]) 395 tuned_config.read_string(out) 396 tuned_state = tuned_config[service]["ActiveState"] 397 self.svc_restore_dict.update({service: tuned_state}) 398 399 if tuned_state != "inactive": 400 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 401 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 402 403 self.tuned_restore_dict = { 404 "profile": profile, 405 "mode": profile_mode 406 } 407 408 self.exec_cmd(["sudo", "systemctl", "start", service]) 409 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 410 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 411 412 def configure_cpu_governor(self): 413 self.log.info("Setting CPU governor to performance...") 414 415 # This assumes that there is the same CPU scaling governor on each CPU 416 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 417 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 418 419 def get_core_list_from_mask(self, core_mask): 420 # Generate list of individual cores from hex core mask 421 # (e.g. '0xffff') or list containing: 422 # - individual cores (e.g. '1, 2, 3') 423 # - core ranges (e.g. '0-3') 424 # - mix of both (e.g. '0, 1-4, 9, 11-13') 425 426 core_list = [] 427 if "0x" in core_mask: 428 core_mask_int = int(core_mask, 16) 429 for i in range(core_mask_int.bit_length()): 430 if (1 << i) & core_mask_int: 431 core_list.append(i) 432 return core_list 433 else: 434 # Core list can be provided in .json config with square brackets 435 # remove them first 436 core_mask = core_mask.replace("[", "") 437 core_mask = core_mask.replace("]", "") 438 439 for i in core_mask.split(","): 440 if "-" in i: 441 start, end = i.split("-") 442 core_range = range(int(start), int(end) + 1) 443 core_list.extend(core_range) 444 else: 445 core_list.append(int(i)) 446 return core_list 447 448 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 449 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 450 451 if mode not in ["default", "bynode", "cpulist"]: 452 raise ValueError("%s irq affinity setting not supported" % mode) 453 454 if mode == "cpulist" and not cpulist: 455 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 456 457 affinity_script = "set_irq_affinity.sh" 458 if "default" not in mode: 459 affinity_script = "set_irq_affinity_cpulist.sh" 460 system_cpu_map = self.get_numa_cpu_map() 461 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 462 463 def cpu_list_to_string(cpulist): 464 return ",".join(map(lambda x: str(x), cpulist)) 465 466 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 467 for nic_name in nic_names: 468 irq_cmd = ["sudo", irq_script_path] 469 470 # Use only CPU cores matching NIC NUMA node. 471 # Remove any CPU cores if they're on exclusion list. 472 if mode == "bynode": 473 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 474 if cpulist and exclude_cpulist: 475 disallowed_cpus = self.get_core_list_from_mask(cpulist) 476 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 477 if not irq_cpus: 478 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 479 irq_cmd.append(cpu_list_to_string(irq_cpus)) 480 481 if mode == "cpulist": 482 irq_cpus = self.get_core_list_from_mask(cpulist) 483 if exclude_cpulist: 484 # Flatten system CPU list, we don't need NUMA awareness here 485 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 486 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 487 if not irq_cpus: 488 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 489 irq_cmd.append(cpu_list_to_string(irq_cpus)) 490 491 irq_cmd.append(nic_name) 492 self.log.info(irq_cmd) 493 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 494 495 def restore_services(self): 496 self.log.info("Restoring services...") 497 for service, state in self.svc_restore_dict.items(): 498 cmd = "stop" if state == "inactive" else "start" 499 self.exec_cmd(["sudo", "systemctl", cmd, service]) 500 501 def restore_sysctl(self): 502 self.log.info("Restoring sysctl settings...") 503 for opt, value in self.sysctl_restore_dict.items(): 504 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 505 506 def restore_tuned(self): 507 self.log.info("Restoring tuned-adm settings...") 508 509 if not self.tuned_restore_dict: 510 return 511 512 if self.tuned_restore_dict["mode"] == "auto": 513 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 514 self.log.info("Reverted tuned-adm to auto_profile.") 515 else: 516 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 517 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 518 519 def restore_governor(self): 520 self.log.info("Restoring CPU governor setting...") 521 if self.governor_restore: 522 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 523 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 524 525 def restore_settings(self): 526 self.restore_governor() 527 self.restore_tuned() 528 self.restore_services() 529 self.restore_sysctl() 530 if self.enable_adq: 531 self.reload_driver("ice") 532 533 534class Target(Server): 535 def __init__(self, name, general_config, target_config): 536 super().__init__(name, general_config, target_config) 537 538 # Defaults 539 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 540 self.subsystem_info_list = [] 541 self.initiator_info = [] 542 543 config_fields = [ 544 ConfigField(name='mode', required=True), 545 ConfigField(name='results_dir', required=True), 546 ConfigField(name='enable_pm', default=True), 547 ConfigField(name='enable_sar', default=True), 548 ConfigField(name='enable_pcm', default=True), 549 ConfigField(name='enable_dpdk_memory', default=True) 550 ] 551 self.read_config(config_fields, target_config) 552 553 self.null_block = target_config.get('null_block_devices', 0) 554 self.scheduler_name = target_config.get('scheduler_settings', 'static') 555 self.enable_zcopy = target_config.get('zcopy_settings', False) 556 self.enable_bw = target_config.get('enable_bandwidth', True) 557 self.nvme_blocklist = target_config.get('blocklist', []) 558 self.nvme_allowlist = target_config.get('allowlist', []) 559 560 # Blocklist takes precedence, remove common elements from allowlist 561 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 562 563 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 564 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 565 566 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 567 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 568 self.set_local_nic_info(self.set_local_nic_info_helper()) 569 570 if self.skip_spdk_install is False: 571 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 572 573 self.configure_system() 574 if self.enable_adq: 575 self.configure_adq() 576 self.configure_irq_affinity() 577 self.sys_config() 578 579 def set_local_nic_info_helper(self): 580 return json.loads(self.exec_cmd(["lshw", "-json"])) 581 582 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 583 stderr_opt = None 584 if stderr_redirect: 585 stderr_opt = subprocess.STDOUT 586 if change_dir: 587 old_cwd = os.getcwd() 588 os.chdir(change_dir) 589 self.log.info("Changing directory to %s" % change_dir) 590 591 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 592 593 if change_dir: 594 os.chdir(old_cwd) 595 self.log.info("Changing directory to %s" % old_cwd) 596 return out 597 598 def zip_spdk_sources(self, spdk_dir, dest_file): 599 self.log.info("Zipping SPDK source directory") 600 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 601 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 602 for file in files: 603 fh.write(os.path.relpath(os.path.join(root, file))) 604 fh.close() 605 self.log.info("Done zipping") 606 607 @staticmethod 608 def _chunks(input_list, chunks_no): 609 div, rem = divmod(len(input_list), chunks_no) 610 for i in range(chunks_no): 611 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 612 yield input_list[si:si + (div + 1 if i < rem else div)] 613 614 def spread_bdevs(self, req_disks): 615 # Spread available block devices indexes: 616 # - evenly across available initiator systems 617 # - evenly across available NIC interfaces for 618 # each initiator 619 # Not NUMA aware. 620 ip_bdev_map = [] 621 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 622 623 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 624 self.initiator_info[i]["bdev_range"] = init_chunk 625 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 626 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 627 for c in nic_chunk: 628 ip_bdev_map.append((ip, c)) 629 return ip_bdev_map 630 631 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 632 cpu_number = os.cpu_count() 633 sar_idle_sum = 0 634 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 635 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 636 637 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 638 time.sleep(ramp_time) 639 self.log.info("Starting SAR measurements") 640 641 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 642 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 643 for line in out.split("\n"): 644 if "Average" in line: 645 if "CPU" in line: 646 self.log.info("Summary CPU utilization from SAR:") 647 self.log.info(line) 648 elif "all" in line: 649 self.log.info(line) 650 else: 651 sar_idle_sum += float(line.split()[7]) 652 fh.write(out) 653 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 654 655 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 656 f.write("%0.2f" % sar_cpu_usage) 657 658 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 659 time.sleep(ramp_time) 660 self.log.info("Starting power measurements") 661 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 662 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 663 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 664 665 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 666 time.sleep(ramp_time) 667 cmd = ["pcm", "1", "-i=%s" % run_time, 668 "-csv=%s/%s" % (results_dir, pcm_file_name)] 669 subprocess.run(cmd) 670 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 671 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 672 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 673 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 674 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 675 676 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 677 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 678 time.sleep(ramp_time) 679 self.log.info("INFO: starting network bandwidth measure") 680 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 681 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 682 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 683 # Improve this so that additional file containing measurements average is generated too. 684 # TODO: Monitor only these interfaces which are currently used to run the workload. 685 686 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 687 self.log.info("INFO: waiting to generate DPDK memory usage") 688 time.sleep(ramp_time) 689 self.log.info("INFO: generating DPDK memory usage") 690 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 691 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 692 693 def sys_config(self): 694 self.log.info("====Kernel release:====") 695 self.log.info(os.uname().release) 696 self.log.info("====Kernel command line:====") 697 with open('/proc/cmdline') as f: 698 cmdline = f.readlines() 699 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 700 self.log.info("====sysctl conf:====") 701 with open('/etc/sysctl.conf') as f: 702 sysctl = f.readlines() 703 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 704 self.log.info("====Cpu power info:====") 705 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 706 self.log.info("====zcopy settings:====") 707 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 708 self.log.info("====Scheduler settings:====") 709 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 710 711 712class Initiator(Server): 713 def __init__(self, name, general_config, initiator_config): 714 super().__init__(name, general_config, initiator_config) 715 716 # required fields and defaults 717 config_fields = [ 718 ConfigField(name='mode', required=True), 719 ConfigField(name='ip', required=True), 720 ConfigField(name='target_nic_ips', required=True), 721 ConfigField(name='cpus_allowed', default=None), 722 ConfigField(name='cpus_allowed_policy', default='shared'), 723 ConfigField(name='spdk_dir', default='/tmp/spdk'), 724 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 725 ConfigField(name='nvmecli_bin', default='nvme'), 726 ConfigField(name='cpu_frequency', default=None), 727 ConfigField(name='allow_cpu_sharing', default=True) 728 ] 729 730 self.read_config(config_fields, initiator_config) 731 732 if os.getenv('SPDK_WORKSPACE'): 733 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 734 735 self.subsystem_info_list = [] 736 737 self.ssh_connection = paramiko.SSHClient() 738 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 739 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 740 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 741 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 742 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 743 744 if self.skip_spdk_install is False: 745 self.copy_spdk("/tmp/spdk.zip") 746 747 self.set_local_nic_info(self.set_local_nic_info_helper()) 748 self.set_cpu_frequency() 749 self.configure_system() 750 if self.enable_adq: 751 self.configure_adq() 752 self.sys_config() 753 754 def set_local_nic_info_helper(self): 755 return json.loads(self.exec_cmd(["lshw", "-json"])) 756 757 def stop(self): 758 self.restore_settings() 759 self.ssh_connection.close() 760 761 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 762 if change_dir: 763 cmd = ["cd", change_dir, ";", *cmd] 764 765 # In case one of the command elements contains whitespace and is not 766 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 767 # when sending to remote system. 768 for i, c in enumerate(cmd): 769 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 770 cmd[i] = '"%s"' % c 771 cmd = " ".join(cmd) 772 773 # Redirect stderr to stdout thanks using get_pty option if needed 774 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 775 out = stdout.read().decode(encoding="utf-8") 776 777 # Check the return code 778 rc = stdout.channel.recv_exit_status() 779 if rc: 780 raise CalledProcessError(int(rc), cmd, out) 781 782 return out 783 784 def put_file(self, local, remote_dest): 785 ftp = self.ssh_connection.open_sftp() 786 ftp.put(local, remote_dest) 787 ftp.close() 788 789 def get_file(self, remote, local_dest): 790 ftp = self.ssh_connection.open_sftp() 791 ftp.get(remote, local_dest) 792 ftp.close() 793 794 def copy_spdk(self, local_spdk_zip): 795 self.log.info("Copying SPDK sources to initiator %s" % self.name) 796 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 797 self.log.info("Copied sources zip from target") 798 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 799 self.log.info("Sources unpacked") 800 801 def copy_result_files(self, dest_dir): 802 self.log.info("Copying results") 803 804 if not os.path.exists(dest_dir): 805 os.mkdir(dest_dir) 806 807 # Get list of result files from initiator and copy them back to target 808 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 809 810 for file in file_list: 811 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 812 os.path.join(dest_dir, file)) 813 self.log.info("Done copying results") 814 815 def match_subsystems(self, target_subsystems): 816 subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips] 817 if not subsystems: 818 raise Exception("No matching subsystems found on target side!") 819 subsystems.sort(key=lambda x: x[1]) 820 self.log.info("Found matching subsystems on target side:") 821 for s in subsystems: 822 self.log.info(s) 823 self.subsystem_info_list = subsystems 824 825 @abstractmethod 826 def init_connect(self): 827 pass 828 829 @abstractmethod 830 def init_disconnect(self): 831 pass 832 833 @abstractmethod 834 def gen_fio_filename_conf(self, *args, **kwargs): 835 # Logic implemented in SPDKInitiator and KernelInitiator classes 836 pass 837 838 def get_route_nic_numa(self, remote_nvme_ip): 839 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 840 local_nvme_nic = local_nvme_nic[0]["dev"] 841 return self.get_nic_numa_node(local_nvme_nic) 842 843 @staticmethod 844 def gen_fio_offset_section(offset_inc, num_jobs): 845 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 846 return "\n".join(["size=%s%%" % offset_inc, 847 "offset=0%", 848 "offset_increment=%s%%" % offset_inc]) 849 850 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 851 numa_stats = {} 852 allowed_cpus = [] 853 for nvme in fio_filenames_list: 854 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 855 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 856 857 # Use the most common NUMA node for this chunk to allocate memory and CPUs 858 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 859 860 # Check if we have enough free CPUs to pop from the list before assigning them 861 if len(self.available_cpus[section_local_numa]) < num_jobs: 862 if self.allow_cpu_sharing: 863 self.log.info("Regenerating available CPU list %s" % section_local_numa) 864 # Remove still available CPUs from the regenerated list. We don't want to 865 # regenerate it with duplicates. 866 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 867 self.available_cpus[section_local_numa].extend(cpus_regen) 868 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 869 else: 870 self.log.error("No more free CPU cores to use from allowed_cpus list!") 871 raise IndexError 872 873 for _ in range(num_jobs): 874 try: 875 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 876 except IndexError: 877 self.log.error("No more free CPU cores to use from allowed_cpus list!") 878 raise 879 880 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 881 "numa_mem_policy=prefer:%s" % section_local_numa]) 882 883 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 884 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 885 offset=False, offset_inc=0): 886 fio_conf_template = """ 887[global] 888ioengine={ioengine} 889{spdk_conf} 890thread=1 891group_reporting=1 892direct=1 893percentile_list=50:90:99:99.5:99.9:99.99:99.999 894 895norandommap=1 896rw={rw} 897rwmixread={rwmixread} 898bs={block_size} 899time_based=1 900ramp_time={ramp_time} 901runtime={run_time} 902rate_iops={rate_iops} 903""" 904 905 if self.cpus_allowed is not None: 906 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 907 cpus_num = 0 908 cpus = self.cpus_allowed.split(",") 909 for cpu in cpus: 910 if "-" in cpu: 911 a, b = cpu.split("-") 912 a = int(a) 913 b = int(b) 914 cpus_num += len(range(a, b)) 915 else: 916 cpus_num += 1 917 self.num_cores = cpus_num 918 threads = range(0, self.num_cores) 919 elif hasattr(self, 'num_cores'): 920 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 921 threads = range(0, int(self.num_cores)) 922 else: 923 self.num_cores = len(self.subsystem_info_list) 924 threads = range(0, len(self.subsystem_info_list)) 925 926 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 927 offset, offset_inc) 928 929 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 930 rw=rw, rwmixread=rwmixread, block_size=block_size, 931 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 932 933 # TODO: hipri disabled for now, as it causes fio errors: 934 # io_u error on file /dev/nvme2n1: Operation not supported 935 # See comment in KernelInitiator class, init_connect() function 936 if "io_uring" in self.ioengine: 937 fio_config = fio_config + """ 938fixedbufs=1 939registerfiles=1 940#hipri=1 941""" 942 if num_jobs: 943 fio_config = fio_config + "numjobs=%s \n" % num_jobs 944 if self.cpus_allowed is not None: 945 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 946 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 947 fio_config = fio_config + filename_section 948 949 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 950 if hasattr(self, "num_cores"): 951 fio_config_filename += "_%sCPU" % self.num_cores 952 fio_config_filename += ".fio" 953 954 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 955 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 956 self.log.info("Created FIO Config:") 957 self.log.info(fio_config) 958 959 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 960 961 def set_cpu_frequency(self): 962 if self.cpu_frequency is not None: 963 try: 964 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 965 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 966 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 967 except Exception: 968 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 969 sys.exit(1) 970 else: 971 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 972 973 def run_fio(self, fio_config_file, run_num=1): 974 job_name, _ = os.path.splitext(fio_config_file) 975 self.log.info("Starting FIO run for job: %s" % job_name) 976 self.log.info("Using FIO: %s" % self.fio_bin) 977 978 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 979 try: 980 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 981 "--output=%s" % output_filename, "--eta=never"], True) 982 self.log.info(output) 983 self.log.info("FIO run finished. Results in: %s" % output_filename) 984 except subprocess.CalledProcessError as e: 985 self.log.error("ERROR: Fio process failed!") 986 self.log.error(e.stdout) 987 988 def sys_config(self): 989 self.log.info("====Kernel release:====") 990 self.log.info(self.exec_cmd(["uname", "-r"])) 991 self.log.info("====Kernel command line:====") 992 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 993 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 994 self.log.info("====sysctl conf:====") 995 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 996 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 997 self.log.info("====Cpu power info:====") 998 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 999 1000 1001class KernelTarget(Target): 1002 def __init__(self, name, general_config, target_config): 1003 super().__init__(name, general_config, target_config) 1004 # Defaults 1005 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1006 1007 def load_drivers(self): 1008 self.log.info("Loading drivers") 1009 super().load_drivers() 1010 if self.null_block: 1011 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1012 1013 def configure_adq(self): 1014 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1015 1016 def adq_configure_tc(self): 1017 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1018 1019 def adq_set_busy_read(self, busy_read_val): 1020 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1021 return {"net.core.busy_read": 0} 1022 1023 def stop(self): 1024 self.nvmet_command(self.nvmet_bin, "clear") 1025 self.restore_settings() 1026 1027 def get_nvme_device_bdf(self, nvme_dev_path): 1028 nvme_name = os.path.basename(nvme_dev_path) 1029 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1030 1031 def get_nvme_devices(self): 1032 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1033 nvme_list = [] 1034 for dev in dev_list: 1035 if "nvme" not in dev: 1036 continue 1037 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1038 continue 1039 if len(self.nvme_allowlist) == 0: 1040 nvme_list.append(dev) 1041 continue 1042 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1043 nvme_list.append(dev) 1044 return nvme_list 1045 1046 def nvmet_command(self, nvmet_bin, command): 1047 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1048 1049 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1050 1051 nvmet_cfg = { 1052 "ports": [], 1053 "hosts": [], 1054 "subsystems": [], 1055 } 1056 1057 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1058 port = str(4420 + bdev_num) 1059 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1060 serial = "SPDK00%s" % bdev_num 1061 bdev_name = nvme_list[bdev_num] 1062 1063 nvmet_cfg["subsystems"].append({ 1064 "allowed_hosts": [], 1065 "attr": { 1066 "allow_any_host": "1", 1067 "serial": serial, 1068 "version": "1.3" 1069 }, 1070 "namespaces": [ 1071 { 1072 "device": { 1073 "path": bdev_name, 1074 "uuid": "%s" % uuid.uuid4() 1075 }, 1076 "enable": 1, 1077 "nsid": port 1078 } 1079 ], 1080 "nqn": nqn 1081 }) 1082 1083 nvmet_cfg["ports"].append({ 1084 "addr": { 1085 "adrfam": "ipv4", 1086 "traddr": ip, 1087 "trsvcid": port, 1088 "trtype": self.transport 1089 }, 1090 "portid": bdev_num, 1091 "referrals": [], 1092 "subsystems": [nqn] 1093 }) 1094 1095 self.subsystem_info_list.append((port, nqn, ip)) 1096 self.subsys_no = len(self.subsystem_info_list) 1097 1098 with open("kernel.conf", "w") as fh: 1099 fh.write(json.dumps(nvmet_cfg, indent=2)) 1100 1101 def tgt_start(self): 1102 self.log.info("Configuring kernel NVMeOF Target") 1103 1104 if self.null_block: 1105 self.log.info("Configuring with null block device.") 1106 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1107 else: 1108 self.log.info("Configuring with NVMe drives.") 1109 nvme_list = self.get_nvme_devices() 1110 1111 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1112 self.subsys_no = len(nvme_list) 1113 1114 self.nvmet_command(self.nvmet_bin, "clear") 1115 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1116 1117 if self.enable_adq: 1118 self.adq_configure_tc() 1119 1120 self.log.info("Done configuring kernel NVMeOF Target") 1121 1122 1123class SPDKTarget(Target): 1124 def __init__(self, name, general_config, target_config): 1125 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1126 # Method setting IRQ affinity is run as part of parent classes init, 1127 # so we need to have self.core_mask set before changing IRQ affinity. 1128 self.core_mask = target_config["core_mask"] 1129 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1130 1131 super().__init__(name, general_config, target_config) 1132 1133 # Format: property, default value 1134 config_fields = [ 1135 ConfigField(name='dif_insert_strip', default=False), 1136 ConfigField(name='null_block_dif_type', default=0), 1137 ConfigField(name='num_shared_buffers', default=4096), 1138 ConfigField(name='max_queue_depth', default=128), 1139 ConfigField(name='bpf_scripts', default=[]), 1140 ConfigField(name='scheduler_core_limit', default=None), 1141 ConfigField(name='dsa_settings', default=False), 1142 ConfigField(name='iobuf_small_pool_count', default=32767), 1143 ConfigField(name='iobuf_large_pool_count', default=16383), 1144 ConfigField(name='num_cqe', default=4096), 1145 ConfigField(name='sock_impl', default='posix') 1146 ] 1147 1148 self.read_config(config_fields, target_config) 1149 1150 self.bpf_proc = None 1151 self.enable_dsa = False 1152 1153 self.log.info("====DSA settings:====") 1154 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1155 1156 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1157 if mode not in ["default", "bynode", "cpulist", 1158 "shared", "split", "split-bynode"]: 1159 self.log.error("%s irq affinity setting not supported" % mode) 1160 raise Exception 1161 1162 # Create core list from SPDK's mask and change it to string. 1163 # This is the type configure_irq_affinity expects for cpulist parameter. 1164 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1165 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1166 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1167 1168 if mode == "shared": 1169 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1170 elif mode == "split": 1171 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1172 elif mode == "split-bynode": 1173 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1174 else: 1175 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1176 1177 def adq_set_busy_read(self, busy_read_val): 1178 return {"net.core.busy_read": busy_read_val} 1179 1180 def get_nvme_devices_count(self): 1181 return len(self.get_nvme_devices()) 1182 1183 def get_nvme_devices(self): 1184 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1185 bdev_bdfs = [] 1186 for bdev in bdev_subsys_json_obj["config"]: 1187 bdev_traddr = bdev["params"]["traddr"] 1188 if bdev_traddr in self.nvme_blocklist: 1189 continue 1190 if len(self.nvme_allowlist) == 0: 1191 bdev_bdfs.append(bdev_traddr) 1192 if bdev_traddr in self.nvme_allowlist: 1193 bdev_bdfs.append(bdev_traddr) 1194 return bdev_bdfs 1195 1196 def spdk_tgt_configure(self): 1197 self.log.info("Configuring SPDK NVMeOF target via RPC") 1198 1199 # Create transport layer 1200 nvmf_transport_params = { 1201 "client": self.client, 1202 "trtype": self.transport, 1203 "num_shared_buffers": self.num_shared_buffers, 1204 "max_queue_depth": self.max_queue_depth, 1205 "dif_insert_or_strip": self.dif_insert_strip, 1206 "sock_priority": self.adq_priority, 1207 "num_cqe": self.num_cqe 1208 } 1209 1210 if self.enable_adq: 1211 nvmf_transport_params["acceptor_poll_rate"] = 10000 1212 1213 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1214 self.log.info("SPDK NVMeOF transport layer:") 1215 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1216 1217 if self.null_block: 1218 self.spdk_tgt_add_nullblock(self.null_block) 1219 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1220 else: 1221 self.spdk_tgt_add_nvme_conf() 1222 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1223 1224 if self.enable_adq: 1225 self.adq_configure_tc() 1226 1227 self.log.info("Done configuring SPDK NVMeOF Target") 1228 1229 def spdk_tgt_add_nullblock(self, null_block_count): 1230 md_size = 0 1231 block_size = 4096 1232 if self.null_block_dif_type != 0: 1233 md_size = 128 1234 1235 self.log.info("Adding null block bdevices to config via RPC") 1236 for i in range(null_block_count): 1237 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1238 rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i), 1239 dif_type=self.null_block_dif_type, md_size=md_size) 1240 self.log.info("SPDK Bdevs configuration:") 1241 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1242 1243 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1244 self.log.info("Adding NVMe bdevs to config via RPC") 1245 1246 bdfs = self.get_nvme_devices() 1247 bdfs = [b.replace(":", ".") for b in bdfs] 1248 1249 if req_num_disks: 1250 if req_num_disks > len(bdfs): 1251 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1252 sys.exit(1) 1253 else: 1254 bdfs = bdfs[0:req_num_disks] 1255 1256 for i, bdf in enumerate(bdfs): 1257 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1258 1259 self.log.info("SPDK Bdevs configuration:") 1260 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1261 1262 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1263 self.log.info("Adding subsystems to config") 1264 if not req_num_disks: 1265 req_num_disks = self.get_nvme_devices_count() 1266 1267 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1268 port = str(4420 + bdev_num) 1269 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1270 serial = "SPDK00%s" % bdev_num 1271 bdev_name = "Nvme%sn1" % bdev_num 1272 1273 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1274 allow_any_host=True, max_namespaces=8) 1275 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1276 for nqn_name in [nqn, "discovery"]: 1277 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1278 nqn=nqn_name, 1279 trtype=self.transport, 1280 traddr=ip, 1281 trsvcid=port, 1282 adrfam="ipv4") 1283 self.subsystem_info_list.append((port, nqn, ip)) 1284 self.subsys_no = len(self.subsystem_info_list) 1285 1286 self.log.info("SPDK NVMeOF subsystem configuration:") 1287 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1288 1289 def bpf_start(self): 1290 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1291 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1292 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1293 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1294 1295 with open(self.pid, "r") as fh: 1296 nvmf_pid = str(fh.readline()) 1297 1298 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1299 self.log.info(cmd) 1300 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1301 1302 def tgt_start(self): 1303 if self.null_block: 1304 self.subsys_no = 1 1305 else: 1306 self.subsys_no = self.get_nvme_devices_count() 1307 self.log.info("Starting SPDK NVMeOF Target process") 1308 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1309 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1310 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1311 1312 with open(self.pid, "w") as fh: 1313 fh.write(str(proc.pid)) 1314 self.nvmf_proc = proc 1315 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1316 self.log.info("Waiting for spdk to initialize...") 1317 while True: 1318 if os.path.exists("/var/tmp/spdk.sock"): 1319 break 1320 time.sleep(1) 1321 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1322 1323 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1324 rpc.iobuf.iobuf_set_options(self.client, 1325 small_pool_count=self.iobuf_small_pool_count, 1326 large_pool_count=self.iobuf_large_pool_count, 1327 small_bufsize=None, 1328 large_bufsize=None) 1329 1330 if self.enable_zcopy: 1331 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1332 enable_zerocopy_send_server=True) 1333 self.log.info("Target socket options:") 1334 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1335 1336 if self.enable_adq: 1337 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1338 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1339 nvme_adminq_poll_period_us=100000, retry_count=4) 1340 1341 if self.enable_dsa: 1342 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1343 self.log.info("Target DSA accel module enabled") 1344 1345 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1346 rpc.framework_start_init(self.client) 1347 1348 if self.bpf_scripts: 1349 self.bpf_start() 1350 1351 self.spdk_tgt_configure() 1352 1353 def stop(self): 1354 if self.bpf_proc: 1355 self.log.info("Stopping BPF Trace script") 1356 self.bpf_proc.terminate() 1357 self.bpf_proc.wait() 1358 1359 if hasattr(self, "nvmf_proc"): 1360 try: 1361 self.nvmf_proc.terminate() 1362 self.nvmf_proc.wait(timeout=30) 1363 except Exception as e: 1364 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1365 self.log.info(e) 1366 self.nvmf_proc.kill() 1367 self.nvmf_proc.communicate() 1368 # Try to clean up RPC socket files if they were not removed 1369 # because of using 'kill' 1370 try: 1371 os.remove("/var/tmp/spdk.sock") 1372 os.remove("/var/tmp/spdk.sock.lock") 1373 except FileNotFoundError: 1374 pass 1375 self.restore_settings() 1376 1377 1378class KernelInitiator(Initiator): 1379 def __init__(self, name, general_config, initiator_config): 1380 super().__init__(name, general_config, initiator_config) 1381 1382 # Defaults 1383 self.extra_params = initiator_config.get('extra_params', '') 1384 1385 self.ioengine = "libaio" 1386 self.spdk_conf = "" 1387 1388 if "num_cores" in initiator_config: 1389 self.num_cores = initiator_config["num_cores"] 1390 1391 if "kernel_engine" in initiator_config: 1392 self.ioengine = initiator_config["kernel_engine"] 1393 if "io_uring" in self.ioengine: 1394 self.extra_params += ' --nr-poll-queues=8' 1395 1396 def configure_adq(self): 1397 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1398 1399 def adq_configure_tc(self): 1400 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1401 1402 def adq_set_busy_read(self, busy_read_val): 1403 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1404 return {"net.core.busy_read": 0} 1405 1406 def get_connected_nvme_list(self): 1407 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1408 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1409 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1410 return nvme_list 1411 1412 def init_connect(self): 1413 self.log.info("Below connection attempts may result in error messages, this is expected!") 1414 for subsystem in self.subsystem_info_list: 1415 self.log.info("Trying to connect %s %s %s" % subsystem) 1416 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1417 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1418 time.sleep(2) 1419 1420 if "io_uring" in self.ioengine: 1421 self.log.info("Setting block layer settings for io_uring.") 1422 1423 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1424 # apparently it's not possible for connected subsystems. 1425 # Results in "error: Invalid argument" 1426 block_sysfs_settings = { 1427 "iostats": "0", 1428 "rq_affinity": "0", 1429 "nomerges": "2" 1430 } 1431 1432 for disk in self.get_connected_nvme_list(): 1433 sysfs = os.path.join("/sys/block", disk, "queue") 1434 for k, v in block_sysfs_settings.items(): 1435 sysfs_opt_path = os.path.join(sysfs, k) 1436 try: 1437 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1438 except CalledProcessError as e: 1439 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1440 finally: 1441 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1442 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1443 1444 def init_disconnect(self): 1445 for subsystem in self.subsystem_info_list: 1446 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1447 time.sleep(1) 1448 1449 def get_nvme_subsystem_numa(self, dev_name): 1450 # Remove two last characters to get controller name instead of subsystem name 1451 nvme_ctrl = os.path.basename(dev_name)[:-2] 1452 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1453 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1454 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1455 1456 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1457 self.available_cpus = self.get_numa_cpu_map() 1458 if len(threads) >= len(subsystems): 1459 threads = range(0, len(subsystems)) 1460 1461 # Generate connected nvme devices names and sort them by used NIC numa node 1462 # to allow better grouping when splitting into fio sections. 1463 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1464 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1465 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1466 1467 filename_section = "" 1468 nvme_per_split = int(len(nvme_list) / len(threads)) 1469 remainder = len(nvme_list) % len(threads) 1470 iterator = iter(nvme_list) 1471 result = [] 1472 for i in range(len(threads)): 1473 result.append([]) 1474 for _ in range(nvme_per_split): 1475 result[i].append(next(iterator)) 1476 if remainder: 1477 result[i].append(next(iterator)) 1478 remainder -= 1 1479 for i, r in enumerate(result): 1480 header = "[filename%s]" % i 1481 disks = "\n".join(["filename=%s" % x for x in r]) 1482 job_section_qd = round((io_depth * len(r)) / num_jobs) 1483 if job_section_qd == 0: 1484 job_section_qd = 1 1485 iodepth = "iodepth=%s" % job_section_qd 1486 1487 offset_section = "" 1488 if offset: 1489 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1490 1491 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1492 1493 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1494 1495 return filename_section 1496 1497 1498class SPDKInitiator(Initiator): 1499 def __init__(self, name, general_config, initiator_config): 1500 super().__init__(name, general_config, initiator_config) 1501 1502 if self.skip_spdk_install is False: 1503 self.install_spdk() 1504 1505 # Optional fields 1506 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1507 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1508 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1509 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1510 1511 if "num_cores" in initiator_config: 1512 self.num_cores = initiator_config["num_cores"] 1513 1514 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1515 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1516 1517 def adq_set_busy_read(self, busy_read_val): 1518 return {"net.core.busy_read": busy_read_val} 1519 1520 def install_spdk(self): 1521 self.log.info("Using fio binary %s" % self.fio_bin) 1522 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1523 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1524 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1525 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1526 "--enable-lto", "--disable-unit-tests"]) 1527 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1528 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1529 1530 self.log.info("SPDK built") 1531 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1532 1533 def init_connect(self): 1534 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1535 # bdev plugin initiates connection just before starting IO traffic. 1536 # This is just to have a "init_connect" equivalent of the same function 1537 # from KernelInitiator class. 1538 # Just prepare bdev.conf JSON file for later use and consider it 1539 # "making a connection". 1540 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1541 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1542 1543 def init_disconnect(self): 1544 # SPDK Initiator does not need to explicity disconnect as this gets done 1545 # after fio bdev plugin finishes IO. 1546 return 1547 1548 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1549 spdk_cfg_section = { 1550 "subsystems": [ 1551 { 1552 "subsystem": "bdev", 1553 "config": [] 1554 }, 1555 { 1556 "subsystem": "iobuf", 1557 "config": [ 1558 { 1559 "method": "iobuf_set_options", 1560 "params": { 1561 "small_pool_count": self.small_pool_count, 1562 "large_pool_count": self.large_pool_count 1563 } 1564 } 1565 ] 1566 }, 1567 { 1568 "subsystem": "sock", 1569 "config": [ 1570 { 1571 "method": "sock_set_default_impl", 1572 "params": { 1573 "impl_name": self.sock_impl 1574 } 1575 } 1576 ] 1577 } 1578 ] 1579 } 1580 1581 for i, subsys in enumerate(remote_subsystem_list): 1582 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1583 nvme_ctrl = { 1584 "method": "bdev_nvme_attach_controller", 1585 "params": { 1586 "name": "Nvme{}".format(i), 1587 "trtype": self.transport, 1588 "traddr": sub_addr, 1589 "trsvcid": sub_port, 1590 "subnqn": sub_nqn, 1591 "adrfam": "IPv4" 1592 } 1593 } 1594 1595 if self.enable_adq: 1596 nvme_ctrl["params"].update({"priority": "1"}) 1597 1598 if self.enable_data_digest: 1599 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1600 1601 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1602 1603 return json.dumps(spdk_cfg_section, indent=2) 1604 1605 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1606 self.available_cpus = self.get_numa_cpu_map() 1607 filename_section = "" 1608 if len(threads) >= len(subsystems): 1609 threads = range(0, len(subsystems)) 1610 1611 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1612 # to allow better grouping when splitting into fio sections. 1613 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1614 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1615 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1616 1617 nvme_per_split = int(len(subsystems) / len(threads)) 1618 remainder = len(subsystems) % len(threads) 1619 iterator = iter(filenames) 1620 result = [] 1621 for i in range(len(threads)): 1622 result.append([]) 1623 for _ in range(nvme_per_split): 1624 result[i].append(next(iterator)) 1625 if remainder: 1626 result[i].append(next(iterator)) 1627 remainder -= 1 1628 for i, r in enumerate(result): 1629 header = "[filename%s]" % i 1630 disks = "\n".join(["filename=%s" % x for x in r]) 1631 job_section_qd = round((io_depth * len(r)) / num_jobs) 1632 if job_section_qd == 0: 1633 job_section_qd = 1 1634 iodepth = "iodepth=%s" % job_section_qd 1635 1636 offset_section = "" 1637 if offset: 1638 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1639 1640 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1641 1642 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1643 1644 return filename_section 1645 1646 def get_nvme_subsystem_numa(self, bdev_name): 1647 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1648 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1649 1650 # Remove two last characters to get controller name instead of subsystem name 1651 nvme_ctrl = bdev_name[:-2] 1652 for bdev in bdev_conf_json_obj: 1653 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1654 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1655 return None 1656 1657 1658def initiators_match_subsystems(initiators, target_obj): 1659 for i in initiators: 1660 i.match_subsystems(target_obj.subsystem_info_list) 1661 if i.enable_adq: 1662 i.adq_configure_tc() 1663 1664 1665def run_single_fio_test(args, 1666 initiators, 1667 configs, 1668 target_obj, 1669 block_size, 1670 io_depth, 1671 rw, 1672 fio_rw_mix_read, 1673 fio_run_num, 1674 fio_ramp_time, 1675 fio_run_time): 1676 1677 for run_no in range(1, fio_run_num+1): 1678 threads = [] 1679 power_daemon = None 1680 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1681 1682 for i, cfg in zip(initiators, configs): 1683 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1684 threads.append(t) 1685 if target_obj.enable_sar: 1686 sar_file_prefix = measurements_prefix + "_sar" 1687 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1688 threads.append(t) 1689 1690 if target_obj.enable_pcm: 1691 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1692 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1693 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1694 threads.append(pcm_cpu_t) 1695 1696 if target_obj.enable_bw: 1697 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1698 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1699 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1700 threads.append(t) 1701 1702 if target_obj.enable_dpdk_memory: 1703 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1704 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1705 threads.append(t) 1706 1707 if target_obj.enable_pm: 1708 power_daemon = threading.Thread(target=target_obj.measure_power, 1709 args=(args.results, measurements_prefix, script_full_dir, 1710 fio_ramp_time, fio_run_time)) 1711 threads.append(power_daemon) 1712 1713 for t in threads: 1714 t.start() 1715 for t in threads: 1716 t.join() 1717 1718 1719def run_fio_tests(args, initiators, target_obj, 1720 fio_workloads, fio_rw_mix_read, 1721 fio_run_num, fio_ramp_time, fio_run_time, 1722 fio_rate_iops, fio_offset, fio_offset_inc): 1723 1724 for block_size, io_depth, rw in fio_workloads: 1725 configs = [] 1726 for i in initiators: 1727 i.init_connect() 1728 cfg = i.gen_fio_config(rw, 1729 fio_rw_mix_read, 1730 block_size, 1731 io_depth, 1732 target_obj.subsys_no, 1733 fio_num_jobs, 1734 fio_ramp_time, 1735 fio_run_time, 1736 fio_rate_iops, 1737 fio_offset, 1738 fio_offset_inc) 1739 configs.append(cfg) 1740 1741 run_single_fio_test(args, 1742 initiators, 1743 configs, 1744 target_obj, 1745 block_size, 1746 io_depth, 1747 rw, 1748 fio_rw_mix_read, 1749 fio_run_num, 1750 fio_ramp_time, 1751 fio_run_time) 1752 1753 for i in initiators: 1754 i.init_disconnect() 1755 i.copy_result_files(args.results) 1756 1757 try: 1758 parse_results(args.results, args.csv_filename) 1759 except Exception as err: 1760 logging.error("There was an error with parsing the results") 1761 raise err 1762 1763 1764if __name__ == "__main__": 1765 exit_code = 0 1766 1767 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1768 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1769 1770 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1771 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1772 help='Configuration file.') 1773 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1774 help='Results directory.') 1775 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1776 help='CSV results filename.') 1777 parser.add_argument('-f', '--force', default=False, action='store_true', 1778 dest='force', help="""Force script to continue and try to use all 1779 available NVMe devices during test. 1780 WARNING: Might result in data loss on used NVMe drives""") 1781 1782 args = parser.parse_args() 1783 1784 logging.basicConfig(level=logging.INFO, 1785 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1786 1787 logging.info("Using config file: %s" % args.config) 1788 with open(args.config, "r") as config: 1789 data = json.load(config) 1790 1791 initiators = [] 1792 general_config = data["general"] 1793 target_config = data["target"] 1794 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1795 1796 if "null_block_devices" not in data["target"] and \ 1797 (args.force is False and 1798 "allowlist" not in data["target"] and 1799 "blocklist" not in data["target"]): 1800 # TODO: Also check if allowlist or blocklist are not empty. 1801 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1802 You can choose to use all available NVMe drives on your system, which may potentially 1803 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1804 exit(1) 1805 1806 for k, v in data.items(): 1807 if "target" in k: 1808 v.update({"results_dir": args.results}) 1809 if data[k]["mode"] == "spdk": 1810 target_obj = SPDKTarget(k, data["general"], v) 1811 elif data[k]["mode"] == "kernel": 1812 target_obj = KernelTarget(k, data["general"], v) 1813 elif "initiator" in k: 1814 if data[k]["mode"] == "spdk": 1815 init_obj = SPDKInitiator(k, data["general"], v) 1816 elif data[k]["mode"] == "kernel": 1817 init_obj = KernelInitiator(k, data["general"], v) 1818 initiators.append(init_obj) 1819 elif "fio" in k: 1820 fio_workloads = itertools.product(data[k]["bs"], 1821 data[k]["qd"], 1822 data[k]["rw"]) 1823 1824 fio_run_time = data[k]["run_time"] 1825 fio_ramp_time = data[k]["ramp_time"] 1826 fio_rw_mix_read = data[k]["rwmixread"] 1827 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1828 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1829 1830 fio_rate_iops = 0 1831 if "rate_iops" in data[k]: 1832 fio_rate_iops = data[k]["rate_iops"] 1833 1834 fio_offset = False 1835 if "offset" in data[k]: 1836 fio_offset = data[k]["offset"] 1837 fio_offset_inc = 0 1838 if "offset_inc" in data[k]: 1839 fio_offset_inc = data[k]["offset_inc"] 1840 else: 1841 continue 1842 1843 try: 1844 os.mkdir(args.results) 1845 except FileExistsError: 1846 pass 1847 1848 for i in initiators: 1849 target_obj.initiator_info.append( 1850 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1851 ) 1852 1853 try: 1854 target_obj.tgt_start() 1855 initiators_match_subsystems(initiators, target_obj) 1856 1857 # Poor mans threading 1858 # Run FIO tests 1859 run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read, 1860 fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops, 1861 fio_offset, fio_offset_inc) 1862 1863 except Exception as e: 1864 logging.error("Exception occurred while running FIO tests") 1865 logging.error(e) 1866 exit_code = 1 1867 1868 finally: 1869 for i in initiators: 1870 try: 1871 i.stop() 1872 except Exception as err: 1873 pass 1874 target_obj.stop() 1875 sys.exit(exit_code) 1876