1#!/usr/bin/env python3 2# SPDX-License-Identifier: BSD-3-Clause 3# Copyright (C) 2018 Intel Corporation 4# All rights reserved. 5# 6 7# Note: we use setattr 8# pylint: disable=no-member 9 10import os 11import re 12import sys 13import argparse 14import json 15import logging 16import zipfile 17import threading 18import subprocess 19import itertools 20import configparser 21import time 22import uuid 23 24import paramiko 25import pandas as pd 26from common import * 27from subprocess import CalledProcessError 28from abc import abstractmethod, ABC 29from dataclasses import dataclass 30from typing import Any 31 32sys.path.append(os.path.dirname(__file__) + '/../../../python') 33 34import spdk.rpc as rpc # noqa 35import spdk.rpc.client as rpc_client # noqa 36 37 38@dataclass 39class ConfigField: 40 name: str = None 41 default: Any = None 42 required: bool = False 43 44 45class Server(ABC): 46 def __init__(self, name, general_config, server_config): 47 self.name = name 48 49 self.log = logging.getLogger(self.name) 50 51 config_fields = [ 52 ConfigField(name='username', required=True), 53 ConfigField(name='password', required=True), 54 ConfigField(name='transport', required=True), 55 ConfigField(name='skip_spdk_install', default=False), 56 ConfigField(name='irdma_roce_enable', default=False), 57 ConfigField(name='pause_frames', default=None) 58 ] 59 60 self.read_config(config_fields, general_config) 61 self.transport = self.transport.lower() 62 63 config_fields = [ 64 ConfigField(name='nic_ips', required=True), 65 ConfigField(name='mode', required=True), 66 ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'), 67 ConfigField(name='enable_arfs', default=False), 68 ConfigField(name='tuned_profile', default='') 69 ] 70 self.read_config(config_fields, server_config) 71 72 self.local_nic_info = [] 73 self._nics_json_obj = {} 74 self.svc_restore_dict = {} 75 self.sysctl_restore_dict = {} 76 self.tuned_restore_dict = {} 77 self.governor_restore = "" 78 79 self.enable_adq = server_config.get('adq_enable', False) 80 self.adq_priority = 1 if self.enable_adq else None 81 self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})} 82 83 if not re.match(r'^[A-Za-z0-9\-]+$', name): 84 self.log.info("Please use a name which contains only letters, numbers or dashes") 85 sys.exit(1) 86 87 def read_config(self, config_fields, config): 88 for config_field in config_fields: 89 value = config.get(config_field.name, config_field.default) 90 if config_field.required is True and value is None: 91 raise ValueError('%s config key is required' % config_field.name) 92 93 setattr(self, config_field.name, value) 94 95 @staticmethod 96 def get_uncommented_lines(lines): 97 return [line for line in lines if line and not line.startswith('#')] 98 99 def get_nic_name_by_ip(self, ip): 100 if not self._nics_json_obj: 101 nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"]) 102 self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj))) 103 for nic in self._nics_json_obj: 104 for addr in nic["addr_info"]: 105 if ip in addr["local"]: 106 return nic["ifname"] 107 108 @abstractmethod 109 def set_local_nic_info_helper(self): 110 pass 111 112 def set_local_nic_info(self, pci_info): 113 def extract_network_elements(json_obj): 114 nic_list = [] 115 if isinstance(json_obj, list): 116 for x in json_obj: 117 nic_list.extend(extract_network_elements(x)) 118 elif isinstance(json_obj, dict): 119 if "children" in json_obj: 120 nic_list.extend(extract_network_elements(json_obj["children"])) 121 if "class" in json_obj.keys() and "network" in json_obj["class"]: 122 nic_list.append(json_obj) 123 return nic_list 124 125 self.local_nic_info = extract_network_elements(pci_info) 126 127 def get_nic_numa_node(self, nic_name): 128 return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name])) 129 130 def get_numa_cpu_map(self): 131 numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"])) 132 numa_cpu_json_map = {} 133 134 for cpu in numa_cpu_json_obj["cpus"]: 135 cpu_num = int(cpu["cpu"]) 136 numa_node = int(cpu["node"]) 137 numa_cpu_json_map.setdefault(numa_node, []) 138 numa_cpu_json_map[numa_node].append(cpu_num) 139 140 return numa_cpu_json_map 141 142 # pylint: disable=R0201 143 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 144 return "" 145 146 def configure_system(self): 147 self.load_drivers() 148 self.configure_services() 149 self.configure_sysctl() 150 self.configure_arfs() 151 self.configure_tuned() 152 self.configure_cpu_governor() 153 self.configure_irq_affinity(**self.irq_settings) 154 self.configure_pause_frames() 155 156 RDMA_PROTOCOL_IWARP = 0 157 RDMA_PROTOCOL_ROCE = 1 158 RDMA_PROTOCOL_UNKNOWN = -1 159 160 def check_rdma_protocol(self): 161 try: 162 roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]) 163 roce_ena = roce_ena.strip() 164 if roce_ena == "0": 165 return self.RDMA_PROTOCOL_IWARP 166 else: 167 return self.RDMA_PROTOCOL_ROCE 168 except CalledProcessError as e: 169 self.log.error("ERROR: failed to check RDMA protocol!") 170 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 171 return self.RDMA_PROTOCOL_UNKNOWN 172 173 def load_drivers(self): 174 self.log.info("Loading drivers") 175 self.exec_cmd(["sudo", "modprobe", "-a", 176 "nvme-%s" % self.transport, 177 "nvmet-%s" % self.transport]) 178 current_mode = self.check_rdma_protocol() 179 if current_mode == self.RDMA_PROTOCOL_UNKNOWN: 180 self.log.error("ERROR: failed to check RDMA protocol mode") 181 return 182 if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP: 183 self.reload_driver("irdma", "roce_ena=1") 184 self.log.info("Loaded irdma driver with RoCE enabled") 185 elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE: 186 self.log.info("Leaving irdma driver with RoCE enabled") 187 else: 188 self.reload_driver("irdma", "roce_ena=0") 189 self.log.info("Loaded irdma driver with iWARP enabled") 190 191 def set_pause_frames(self, rx_state, tx_state): 192 for nic_ip in self.nic_ips: 193 nic_name = self.get_nic_name_by_ip(nic_ip) 194 self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state]) 195 196 def configure_pause_frames(self): 197 if self.pause_frames is None: 198 self.log.info("Keeping NIC's default pause frames setting") 199 return 200 elif not self.pause_frames: 201 self.log.info("Turning off pause frames") 202 self.set_pause_frames("off", "off") 203 else: 204 self.log.info("Turning on pause frames") 205 self.set_pause_frames("on", "on") 206 207 def configure_arfs(self): 208 rps_flow_cnt = 512 209 if not self.enable_arfs: 210 rps_flow_cnt = 0 211 212 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 213 for nic_name in nic_names: 214 self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"]) 215 self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}") 216 queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n") 217 queue_files = filter(lambda x: x.startswith("rx-"), queue_files) 218 219 for qf in queue_files: 220 self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]) 221 set_value = self.exec_cmd(["cat", f"/sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]).strip() 222 self.log.info(f"Confirmed rps_flow_cnt set to {set_value} in /sys/class/net/{nic_name}/queues/{qf}") 223 224 self.log.info(f"Configuration of {nic_name} completed with confirmed rps_flow_cnt settings.") 225 226 self.log.info("ARFS configuration completed.") 227 228 def configure_adq(self): 229 self.adq_load_modules() 230 self.adq_configure_nic() 231 232 def adq_load_modules(self): 233 self.log.info("Modprobing ADQ-related Linux modules...") 234 adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"] 235 for module in adq_module_deps: 236 try: 237 self.exec_cmd(["sudo", "modprobe", module]) 238 self.log.info("%s loaded!" % module) 239 except CalledProcessError as e: 240 self.log.error("ERROR: failed to load module %s" % module) 241 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 242 243 def adq_configure_tc(self): 244 self.log.info("Configuring ADQ Traffic classes and filters...") 245 246 num_queues_tc0 = 2 # 2 is minimum number of queues for TC0 247 num_queues_tc1 = self.num_cores 248 port_param = "dst_port" if isinstance(self, Target) else "src_port" 249 xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs") 250 251 for nic_ip in self.nic_ips: 252 nic_name = self.get_nic_name_by_ip(nic_ip) 253 nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]] 254 255 tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, 256 "root", "mqprio", "num_tc", "2", "map", "0", "1", 257 "queues", "%s@0" % num_queues_tc0, 258 "%s@%s" % (num_queues_tc1, num_queues_tc0), 259 "hw", "1", "mode", "channel"] 260 self.log.info(" ".join(tc_qdisc_map_cmd)) 261 self.exec_cmd(tc_qdisc_map_cmd) 262 263 time.sleep(5) 264 tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"] 265 self.log.info(" ".join(tc_qdisc_ingress_cmd)) 266 self.exec_cmd(tc_qdisc_ingress_cmd) 267 268 nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip()) 269 self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf, 270 "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"]) 271 272 for port in nic_ports: 273 tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name, 274 "protocol", "ip", "ingress", "prio", "1", "flower", 275 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port, 276 "skip_sw", "hw_tc", "1"] 277 self.log.info(" ".join(tc_filter_cmd)) 278 self.exec_cmd(tc_filter_cmd) 279 280 # show tc configuration 281 self.log.info("Show tc configuration for %s NIC..." % nic_name) 282 tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name]) 283 tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"]) 284 self.log.info("%s" % tc_disk_out) 285 self.log.info("%s" % tc_filter_out) 286 287 # Ethtool coalesce settings must be applied after configuring traffic classes 288 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"]) 289 self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"]) 290 291 self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name) 292 xps_cmd = ["sudo", xps_script_path, nic_name] 293 self.log.info(xps_cmd) 294 self.exec_cmd(xps_cmd) 295 296 def reload_driver(self, driver, *modprobe_args): 297 298 try: 299 self.exec_cmd(["sudo", "rmmod", driver]) 300 self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args]) 301 except CalledProcessError as e: 302 self.log.error("ERROR: failed to reload %s module!" % driver) 303 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 304 305 def adq_configure_nic(self): 306 self.log.info("Configuring NIC port settings for ADQ testing...") 307 308 # Reload the driver first, to make sure any previous settings are re-set. 309 self.reload_driver("ice") 310 311 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 312 for nic in nic_names: 313 self.log.info(nic) 314 try: 315 self.exec_cmd(["sudo", "ethtool", "-K", nic, 316 "hw-tc-offload", "on"]) # Enable hardware TC offload 317 nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic]) 318 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"]) # Disable LLDP 319 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"]) 320 # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes, 321 # but can be used with 4k block sizes as well 322 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"]) 323 self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"]) 324 except subprocess.CalledProcessError as e: 325 self.log.error("ERROR: failed to configure NIC port using ethtool!") 326 self.log.error("%s resulted in error: %s" % (e.cmd, e.output)) 327 self.log.info("Please update your NIC driver and firmware versions and try again.") 328 self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic])) 329 self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic])) 330 331 def configure_services(self): 332 self.log.info("Configuring active services...") 333 svc_config = configparser.ConfigParser(strict=False) 334 335 # Below list is valid only for RHEL / Fedora systems and might not 336 # contain valid names for other distributions. 337 svc_target_state = { 338 "firewalld": "inactive", 339 "irqbalance": "inactive", 340 "lldpad.service": "inactive", 341 "lldpad.socket": "inactive" 342 } 343 344 for service in svc_target_state: 345 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 346 out = "\n".join(["[%s]" % service, out]) 347 svc_config.read_string(out) 348 349 if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]: 350 continue 351 352 service_state = svc_config[service]["ActiveState"] 353 self.log.info("Current state of %s service is %s" % (service, service_state)) 354 self.svc_restore_dict.update({service: service_state}) 355 if service_state != "inactive": 356 self.log.info("Disabling %s. It will be restored after the test has finished." % service) 357 self.exec_cmd(["sudo", "systemctl", "stop", service]) 358 359 def configure_sysctl(self): 360 self.log.info("Tuning sysctl settings...") 361 362 busy_read = 0 363 busy_poll = 0 364 if self.enable_adq and self.mode == "spdk": 365 busy_read = 1 366 busy_poll = 1 367 368 sysctl_opts = { 369 "net.core.busy_poll": busy_poll, 370 "net.core.busy_read": busy_read, 371 "net.core.somaxconn": 4096, 372 "net.core.netdev_max_backlog": 8192, 373 "net.ipv4.tcp_max_syn_backlog": 16384, 374 "net.core.rmem_max": 268435456, 375 "net.core.wmem_max": 268435456, 376 "net.ipv4.tcp_mem": "268435456 268435456 268435456", 377 "net.ipv4.tcp_rmem": "8192 1048576 33554432", 378 "net.ipv4.tcp_wmem": "8192 1048576 33554432", 379 "net.ipv4.route.flush": 1, 380 "vm.overcommit_memory": 1, 381 "net.core.rps_sock_flow_entries": 0 382 } 383 384 if self.enable_adq: 385 sysctl_opts.update(self.adq_set_busy_read(1)) 386 387 if self.enable_arfs: 388 sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768}) 389 390 for opt, value in sysctl_opts.items(): 391 self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()}) 392 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 393 394 def configure_tuned(self): 395 if not self.tuned_profile: 396 self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.") 397 return 398 399 self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile) 400 service = "tuned" 401 tuned_config = configparser.ConfigParser(strict=False) 402 403 out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service]) 404 out = "\n".join(["[%s]" % service, out]) 405 tuned_config.read_string(out) 406 tuned_state = tuned_config[service]["ActiveState"] 407 self.svc_restore_dict.update({service: tuned_state}) 408 409 if tuned_state != "inactive": 410 profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip() 411 profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip() 412 413 self.tuned_restore_dict = { 414 "profile": profile, 415 "mode": profile_mode 416 } 417 418 self.exec_cmd(["sudo", "systemctl", "start", service]) 419 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile]) 420 self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"])) 421 422 def configure_cpu_governor(self): 423 self.log.info("Setting CPU governor to performance...") 424 425 # This assumes that there is the same CPU scaling governor on each CPU 426 self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip() 427 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"]) 428 429 def get_core_list_from_mask(self, core_mask): 430 # Generate list of individual cores from hex core mask 431 # (e.g. '0xffff') or list containing: 432 # - individual cores (e.g. '1, 2, 3') 433 # - core ranges (e.g. '0-3') 434 # - mix of both (e.g. '0, 1-4, 9, 11-13') 435 436 core_list = [] 437 if "0x" in core_mask: 438 core_mask_int = int(core_mask, 16) 439 for i in range(core_mask_int.bit_length()): 440 if (1 << i) & core_mask_int: 441 core_list.append(i) 442 return core_list 443 else: 444 # Core list can be provided in .json config with square brackets 445 # remove them first 446 core_mask = core_mask.replace("[", "") 447 core_mask = core_mask.replace("]", "") 448 449 for i in core_mask.split(","): 450 if "-" in i: 451 start, end = i.split("-") 452 core_range = range(int(start), int(end) + 1) 453 core_list.extend(core_range) 454 else: 455 core_list.append(int(i)) 456 return core_list 457 458 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 459 self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode) 460 461 if mode not in ["default", "bynode", "cpulist"]: 462 raise ValueError("%s irq affinity setting not supported" % mode) 463 464 if mode == "cpulist" and not cpulist: 465 raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode) 466 467 affinity_script = "set_irq_affinity.sh" 468 if "default" not in mode: 469 affinity_script = "set_irq_affinity_cpulist.sh" 470 system_cpu_map = self.get_numa_cpu_map() 471 irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script) 472 473 def cpu_list_to_string(cpulist): 474 return ",".join(map(lambda x: str(x), cpulist)) 475 476 nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips] 477 for nic_name in nic_names: 478 irq_cmd = ["sudo", irq_script_path] 479 480 # Use only CPU cores matching NIC NUMA node. 481 # Remove any CPU cores if they're on exclusion list. 482 if mode == "bynode": 483 irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)] 484 if cpulist and exclude_cpulist: 485 disallowed_cpus = self.get_core_list_from_mask(cpulist) 486 irq_cpus = list(set(irq_cpus) - set(disallowed_cpus)) 487 if not irq_cpus: 488 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 489 irq_cmd.append(cpu_list_to_string(irq_cpus)) 490 491 if mode == "cpulist": 492 irq_cpus = self.get_core_list_from_mask(cpulist) 493 if exclude_cpulist: 494 # Flatten system CPU list, we don't need NUMA awareness here 495 system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v}) 496 irq_cpus = list(set(system_cpu_list) - set(irq_cpus)) 497 if not irq_cpus: 498 raise Exception("No CPUs left to process IRQs after excluding CPUs!") 499 irq_cmd.append(cpu_list_to_string(irq_cpus)) 500 501 irq_cmd.append(nic_name) 502 self.log.info(irq_cmd) 503 self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir) 504 505 def restore_services(self): 506 self.log.info("Restoring services...") 507 for service, state in self.svc_restore_dict.items(): 508 cmd = "stop" if state == "inactive" else "start" 509 self.exec_cmd(["sudo", "systemctl", cmd, service]) 510 511 def restore_sysctl(self): 512 self.log.info("Restoring sysctl settings...") 513 for opt, value in self.sysctl_restore_dict.items(): 514 self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip()) 515 516 def restore_tuned(self): 517 self.log.info("Restoring tuned-adm settings...") 518 519 if not self.tuned_restore_dict: 520 return 521 522 if self.tuned_restore_dict["mode"] == "auto": 523 self.exec_cmd(["sudo", "tuned-adm", "auto_profile"]) 524 self.log.info("Reverted tuned-adm to auto_profile.") 525 else: 526 self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]]) 527 self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"]) 528 529 def restore_governor(self): 530 self.log.info("Restoring CPU governor setting...") 531 if self.governor_restore: 532 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore]) 533 self.log.info("Reverted CPU governor to %s." % self.governor_restore) 534 535 def restore_settings(self): 536 self.restore_governor() 537 self.restore_tuned() 538 self.restore_services() 539 self.restore_sysctl() 540 if self.enable_adq: 541 self.reload_driver("ice") 542 543 544class Target(Server): 545 def __init__(self, name, general_config, target_config): 546 super().__init__(name, general_config, target_config) 547 548 # Defaults 549 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 550 self.subsystem_info_list = [] 551 self.initiator_info = [] 552 553 config_fields = [ 554 ConfigField(name='mode', required=True), 555 ConfigField(name='results_dir', required=True), 556 ConfigField(name='enable_pm', default=True), 557 ConfigField(name='enable_sar', default=True), 558 ConfigField(name='enable_pcm', default=True), 559 ConfigField(name='enable_dpdk_memory', default=True) 560 ] 561 self.read_config(config_fields, target_config) 562 563 self.null_block = target_config.get('null_block_devices', 0) 564 self.scheduler_name = target_config.get('scheduler_settings', 'static') 565 self.enable_zcopy = target_config.get('zcopy_settings', False) 566 self.enable_bw = target_config.get('enable_bandwidth', True) 567 self.nvme_blocklist = target_config.get('blocklist', []) 568 self.nvme_allowlist = target_config.get('allowlist', []) 569 570 # Blocklist takes precedence, remove common elements from allowlist 571 self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist)) 572 573 self.log.info("Items now on allowlist: %s" % self.nvme_allowlist) 574 self.log.info("Items now on blocklist: %s" % self.nvme_blocklist) 575 576 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0])) 577 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../")) 578 self.set_local_nic_info(self.set_local_nic_info_helper()) 579 580 if self.skip_spdk_install is False: 581 self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip") 582 583 self.configure_system() 584 if self.enable_adq: 585 self.configure_adq() 586 self.configure_irq_affinity() 587 self.sys_config() 588 589 def set_local_nic_info_helper(self): 590 return json.loads(self.exec_cmd(["lshw", "-json"])) 591 592 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 593 stderr_opt = None 594 if stderr_redirect: 595 stderr_opt = subprocess.STDOUT 596 if change_dir: 597 old_cwd = os.getcwd() 598 os.chdir(change_dir) 599 self.log.info("Changing directory to %s" % change_dir) 600 601 out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8") 602 603 if change_dir: 604 os.chdir(old_cwd) 605 self.log.info("Changing directory to %s" % old_cwd) 606 return out 607 608 def zip_spdk_sources(self, spdk_dir, dest_file): 609 self.log.info("Zipping SPDK source directory") 610 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED) 611 for root, _directories, files in os.walk(spdk_dir, followlinks=True): 612 for file in files: 613 fh.write(os.path.relpath(os.path.join(root, file))) 614 fh.close() 615 self.log.info("Done zipping") 616 617 @staticmethod 618 def _chunks(input_list, chunks_no): 619 div, rem = divmod(len(input_list), chunks_no) 620 for i in range(chunks_no): 621 si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem) 622 yield input_list[si:si + (div + 1 if i < rem else div)] 623 624 def spread_bdevs(self, req_disks): 625 # Spread available block devices indexes: 626 # - evenly across available initiator systems 627 # - evenly across available NIC interfaces for 628 # each initiator 629 # Not NUMA aware. 630 ip_bdev_map = [] 631 initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info)) 632 633 for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)): 634 self.initiator_info[i]["bdev_range"] = init_chunk 635 init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"]))) 636 for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list): 637 for c in nic_chunk: 638 ip_bdev_map.append((ip, c)) 639 return ip_bdev_map 640 641 def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time): 642 cpu_number = os.cpu_count() 643 sar_idle_sum = 0 644 sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt") 645 sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"])) 646 647 self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time) 648 time.sleep(ramp_time) 649 self.log.info("Starting SAR measurements") 650 651 out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time]) 652 with open(os.path.join(results_dir, sar_output_file), "w") as fh: 653 for line in out.split("\n"): 654 if "Average" in line: 655 if "CPU" in line: 656 self.log.info("Summary CPU utilization from SAR:") 657 self.log.info(line) 658 elif "all" in line: 659 self.log.info(line) 660 else: 661 sar_idle_sum += float(line.split()[7]) 662 fh.write(out) 663 sar_cpu_usage = cpu_number * 100 - sar_idle_sum 664 665 with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f: 666 f.write("%0.2f" % sar_cpu_usage) 667 668 def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time): 669 time.sleep(ramp_time) 670 self.log.info("Starting power measurements") 671 self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir, 672 "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix, 673 "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"]) 674 675 def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time): 676 time.sleep(ramp_time) 677 cmd = ["pcm", "1", "-i=%s" % run_time, 678 "-csv=%s/%s" % (results_dir, pcm_file_name)] 679 subprocess.run(cmd) 680 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1]) 681 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x)) 682 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})] 683 skt_pcm_file_name = "_".join(["skt", pcm_file_name]) 684 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False) 685 686 def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time): 687 self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time) 688 time.sleep(ramp_time) 689 self.log.info("INFO: starting network bandwidth measure") 690 self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name), 691 "-a", "1", "-t", "1000", "-c", "%s" % run_time]) 692 # TODO: Above command results in a .csv file containing measurements for all gathered samples. 693 # Improve this so that additional file containing measurements average is generated too. 694 # TODO: Monitor only these interfaces which are currently used to run the workload. 695 696 def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time): 697 self.log.info("INFO: waiting to generate DPDK memory usage") 698 time.sleep(ramp_time) 699 self.log.info("INFO: generating DPDK memory usage") 700 tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"] 701 os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name)) 702 703 def sys_config(self): 704 self.log.info("====Kernel release:====") 705 self.log.info(os.uname().release) 706 self.log.info("====Kernel command line:====") 707 with open('/proc/cmdline') as f: 708 cmdline = f.readlines() 709 self.log.info('\n'.join(self.get_uncommented_lines(cmdline))) 710 self.log.info("====sysctl conf:====") 711 with open('/etc/sysctl.conf') as f: 712 sysctl = f.readlines() 713 self.log.info('\n'.join(self.get_uncommented_lines(sysctl))) 714 self.log.info("====Cpu power info:====") 715 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 716 self.log.info("====zcopy settings:====") 717 self.log.info("zcopy enabled: %s" % (self.enable_zcopy)) 718 self.log.info("====Scheduler settings:====") 719 self.log.info("SPDK scheduler: %s" % (self.scheduler_name)) 720 721 722class Initiator(Server): 723 def __init__(self, name, general_config, initiator_config): 724 super().__init__(name, general_config, initiator_config) 725 726 # required fields and defaults 727 config_fields = [ 728 ConfigField(name='mode', required=True), 729 ConfigField(name='ip', required=True), 730 ConfigField(name='target_nic_ips', required=True), 731 ConfigField(name='cpus_allowed', default=None), 732 ConfigField(name='cpus_allowed_policy', default='shared'), 733 ConfigField(name='spdk_dir', default='/tmp/spdk'), 734 ConfigField(name='fio_bin', default='/usr/src/fio/fio'), 735 ConfigField(name='nvmecli_bin', default='nvme'), 736 ConfigField(name='cpu_frequency', default=None), 737 ConfigField(name='allow_cpu_sharing', default=True) 738 ] 739 740 self.read_config(config_fields, initiator_config) 741 742 if os.getenv('SPDK_WORKSPACE'): 743 self.spdk_dir = os.getenv('SPDK_WORKSPACE') 744 745 self.subsystem_info_list = [] 746 747 self.ssh_connection = paramiko.SSHClient() 748 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 749 self.ssh_connection.connect(self.ip, username=self.username, password=self.password) 750 self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir]) 751 self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir]) 752 self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"])) 753 754 if self.skip_spdk_install is False: 755 self.copy_spdk("/tmp/spdk.zip") 756 757 self.set_local_nic_info(self.set_local_nic_info_helper()) 758 self.set_cpu_frequency() 759 self.configure_system() 760 if self.enable_adq: 761 self.configure_adq() 762 self.sys_config() 763 764 def set_local_nic_info_helper(self): 765 return json.loads(self.exec_cmd(["lshw", "-json"])) 766 767 def stop(self): 768 self.restore_settings() 769 self.ssh_connection.close() 770 771 def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None): 772 if change_dir: 773 cmd = ["cd", change_dir, ";", *cmd] 774 775 # In case one of the command elements contains whitespace and is not 776 # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion 777 # when sending to remote system. 778 for i, c in enumerate(cmd): 779 if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")): 780 cmd[i] = '"%s"' % c 781 cmd = " ".join(cmd) 782 783 # Redirect stderr to stdout thanks using get_pty option if needed 784 _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect) 785 out = stdout.read().decode(encoding="utf-8") 786 787 # Check the return code 788 rc = stdout.channel.recv_exit_status() 789 if rc: 790 raise CalledProcessError(int(rc), cmd, out) 791 792 return out 793 794 def put_file(self, local, remote_dest): 795 ftp = self.ssh_connection.open_sftp() 796 ftp.put(local, remote_dest) 797 ftp.close() 798 799 def get_file(self, remote, local_dest): 800 ftp = self.ssh_connection.open_sftp() 801 ftp.get(remote, local_dest) 802 ftp.close() 803 804 def copy_spdk(self, local_spdk_zip): 805 self.log.info("Copying SPDK sources to initiator %s" % self.name) 806 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip") 807 self.log.info("Copied sources zip from target") 808 self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir]) 809 self.log.info("Sources unpacked") 810 811 def copy_result_files(self, dest_dir): 812 self.log.info("Copying results") 813 814 if not os.path.exists(dest_dir): 815 os.mkdir(dest_dir) 816 817 # Get list of result files from initiator and copy them back to target 818 file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n") 819 820 for file in file_list: 821 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file), 822 os.path.join(dest_dir, file)) 823 self.log.info("Done copying results") 824 825 def match_subsystems(self, target_subsystems): 826 subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips] 827 if not subsystems: 828 raise Exception("No matching subsystems found on target side!") 829 subsystems.sort(key=lambda x: x[1]) 830 self.log.info("Found matching subsystems on target side:") 831 for s in subsystems: 832 self.log.info(s) 833 self.subsystem_info_list = subsystems 834 835 @abstractmethod 836 def init_connect(self): 837 pass 838 839 @abstractmethod 840 def init_disconnect(self): 841 pass 842 843 @abstractmethod 844 def gen_fio_filename_conf(self, *args, **kwargs): 845 # Logic implemented in SPDKInitiator and KernelInitiator classes 846 pass 847 848 def get_route_nic_numa(self, remote_nvme_ip): 849 local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip])) 850 local_nvme_nic = local_nvme_nic[0]["dev"] 851 return self.get_nic_numa_node(local_nvme_nic) 852 853 @staticmethod 854 def gen_fio_offset_section(offset_inc, num_jobs): 855 offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc 856 return "\n".join(["size=%s%%" % offset_inc, 857 "offset=0%", 858 "offset_increment=%s%%" % offset_inc]) 859 860 def gen_fio_numa_section(self, fio_filenames_list, num_jobs): 861 numa_stats = {} 862 allowed_cpus = [] 863 for nvme in fio_filenames_list: 864 nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme)) 865 numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1 866 867 # Use the most common NUMA node for this chunk to allocate memory and CPUs 868 section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0] 869 870 # Check if we have enough free CPUs to pop from the list before assigning them 871 if len(self.available_cpus[section_local_numa]) < num_jobs: 872 if self.allow_cpu_sharing: 873 self.log.info("Regenerating available CPU list %s" % section_local_numa) 874 # Remove still available CPUs from the regenerated list. We don't want to 875 # regenerate it with duplicates. 876 cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa]) 877 self.available_cpus[section_local_numa].extend(cpus_regen) 878 self.log.info(self.log.info(self.available_cpus[section_local_numa])) 879 else: 880 self.log.error("No more free CPU cores to use from allowed_cpus list!") 881 raise IndexError 882 883 for _ in range(num_jobs): 884 try: 885 allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0))) 886 except IndexError: 887 self.log.error("No more free CPU cores to use from allowed_cpus list!") 888 raise 889 890 return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus), 891 "numa_mem_policy=prefer:%s" % section_local_numa]) 892 893 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, 894 num_jobs=None, ramp_time=0, run_time=10, rate_iops=0, 895 offset=False, offset_inc=0): 896 fio_conf_template = """ 897[global] 898ioengine={ioengine} 899{spdk_conf} 900thread=1 901group_reporting=1 902direct=1 903percentile_list=50:90:99:99.5:99.9:99.99:99.999 904 905norandommap=1 906rw={rw} 907rwmixread={rwmixread} 908bs={block_size} 909time_based=1 910ramp_time={ramp_time} 911runtime={run_time} 912rate_iops={rate_iops} 913""" 914 915 if self.cpus_allowed is not None: 916 self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed) 917 cpus_num = 0 918 cpus = self.cpus_allowed.split(",") 919 for cpu in cpus: 920 if "-" in cpu: 921 a, b = cpu.split("-") 922 a = int(a) 923 b = int(b) 924 cpus_num += len(range(a, b)) 925 else: 926 cpus_num += 1 927 self.num_cores = cpus_num 928 threads = range(0, self.num_cores) 929 elif hasattr(self, 'num_cores'): 930 self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores) 931 threads = range(0, int(self.num_cores)) 932 else: 933 self.num_cores = len(self.subsystem_info_list) 934 threads = range(0, len(self.subsystem_info_list)) 935 936 filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs, 937 offset, offset_inc) 938 939 fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf, 940 rw=rw, rwmixread=rwmixread, block_size=block_size, 941 ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops) 942 943 # TODO: hipri disabled for now, as it causes fio errors: 944 # io_u error on file /dev/nvme2n1: Operation not supported 945 # See comment in KernelInitiator class, init_connect() function 946 if "io_uring" in self.ioengine: 947 fio_config = fio_config + """ 948fixedbufs=1 949registerfiles=1 950#hipri=1 951""" 952 if num_jobs: 953 fio_config = fio_config + "numjobs=%s \n" % num_jobs 954 if self.cpus_allowed is not None: 955 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed 956 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy 957 fio_config = fio_config + filename_section 958 959 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread) 960 if hasattr(self, "num_cores"): 961 fio_config_filename += "_%sCPU" % self.num_cores 962 fio_config_filename += ".fio" 963 964 self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir]) 965 self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)]) 966 self.log.info("Created FIO Config:") 967 self.log.info(fio_config) 968 969 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename) 970 971 def set_cpu_frequency(self): 972 if self.cpu_frequency is not None: 973 try: 974 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True) 975 self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True) 976 self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"])) 977 except Exception: 978 self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!") 979 sys.exit(1) 980 else: 981 self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.") 982 983 def run_fio(self, fio_config_file, run_num=1): 984 job_name, _ = os.path.splitext(fio_config_file) 985 self.log.info("Starting FIO run for job: %s" % job_name) 986 self.log.info("Using FIO: %s" % self.fio_bin) 987 988 output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json" 989 try: 990 output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json", 991 "--output=%s" % output_filename, "--eta=never"], True) 992 self.log.info(output) 993 self.log.info("FIO run finished. Results in: %s" % output_filename) 994 except subprocess.CalledProcessError as e: 995 self.log.error("ERROR: Fio process failed!") 996 self.log.error(e.stdout) 997 998 def sys_config(self): 999 self.log.info("====Kernel release:====") 1000 self.log.info(self.exec_cmd(["uname", "-r"])) 1001 self.log.info("====Kernel command line:====") 1002 cmdline = self.exec_cmd(["cat", "/proc/cmdline"]) 1003 self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines()))) 1004 self.log.info("====sysctl conf:====") 1005 sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"]) 1006 self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines()))) 1007 self.log.info("====Cpu power info:====") 1008 self.log.info(self.exec_cmd(["cpupower", "frequency-info"])) 1009 1010 1011class KernelTarget(Target): 1012 def __init__(self, name, general_config, target_config): 1013 super().__init__(name, general_config, target_config) 1014 # Defaults 1015 self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli') 1016 1017 def load_drivers(self): 1018 self.log.info("Loading drivers") 1019 super().load_drivers() 1020 if self.null_block: 1021 self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block]) 1022 1023 def configure_adq(self): 1024 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1025 1026 def adq_configure_tc(self): 1027 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1028 1029 def adq_set_busy_read(self, busy_read_val): 1030 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1031 return {"net.core.busy_read": 0} 1032 1033 def stop(self): 1034 self.nvmet_command(self.nvmet_bin, "clear") 1035 self.restore_settings() 1036 1037 def get_nvme_device_bdf(self, nvme_dev_path): 1038 nvme_name = os.path.basename(nvme_dev_path) 1039 return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip() 1040 1041 def get_nvme_devices(self): 1042 dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n") 1043 nvme_list = [] 1044 for dev in dev_list: 1045 if "nvme" not in dev: 1046 continue 1047 if self.get_nvme_device_bdf(dev) in self.nvme_blocklist: 1048 continue 1049 if len(self.nvme_allowlist) == 0: 1050 nvme_list.append(dev) 1051 continue 1052 if self.get_nvme_device_bdf(dev) in self.nvme_allowlist: 1053 nvme_list.append(dev) 1054 return nvme_list 1055 1056 def nvmet_command(self, nvmet_bin, command): 1057 return self.exec_cmd([nvmet_bin, *(command.split(" "))]) 1058 1059 def kernel_tgt_gen_subsystem_conf(self, nvme_list): 1060 1061 nvmet_cfg = { 1062 "ports": [], 1063 "hosts": [], 1064 "subsystems": [], 1065 } 1066 1067 for ip, bdev_num in self.spread_bdevs(len(nvme_list)): 1068 port = str(4420 + bdev_num) 1069 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1070 serial = "SPDK00%s" % bdev_num 1071 bdev_name = nvme_list[bdev_num] 1072 1073 nvmet_cfg["subsystems"].append({ 1074 "allowed_hosts": [], 1075 "attr": { 1076 "allow_any_host": "1", 1077 "serial": serial, 1078 "version": "1.3" 1079 }, 1080 "namespaces": [ 1081 { 1082 "device": { 1083 "path": bdev_name, 1084 "uuid": "%s" % uuid.uuid4() 1085 }, 1086 "enable": 1, 1087 "nsid": port 1088 } 1089 ], 1090 "nqn": nqn 1091 }) 1092 1093 nvmet_cfg["ports"].append({ 1094 "addr": { 1095 "adrfam": "ipv4", 1096 "traddr": ip, 1097 "trsvcid": port, 1098 "trtype": self.transport 1099 }, 1100 "portid": bdev_num, 1101 "referrals": [], 1102 "subsystems": [nqn] 1103 }) 1104 1105 self.subsystem_info_list.append((port, nqn, ip)) 1106 self.subsys_no = len(self.subsystem_info_list) 1107 1108 with open("kernel.conf", "w") as fh: 1109 fh.write(json.dumps(nvmet_cfg, indent=2)) 1110 1111 def tgt_start(self): 1112 self.log.info("Configuring kernel NVMeOF Target") 1113 1114 if self.null_block: 1115 self.log.info("Configuring with null block device.") 1116 nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)] 1117 else: 1118 self.log.info("Configuring with NVMe drives.") 1119 nvme_list = self.get_nvme_devices() 1120 1121 self.kernel_tgt_gen_subsystem_conf(nvme_list) 1122 self.subsys_no = len(nvme_list) 1123 1124 self.nvmet_command(self.nvmet_bin, "clear") 1125 self.nvmet_command(self.nvmet_bin, "restore kernel.conf") 1126 1127 if self.enable_adq: 1128 self.adq_configure_tc() 1129 1130 self.log.info("Done configuring kernel NVMeOF Target") 1131 1132 1133class SPDKTarget(Target): 1134 def __init__(self, name, general_config, target_config): 1135 # IRQ affinity on SPDK Target side takes Target's core mask into consideration. 1136 # Method setting IRQ affinity is run as part of parent classes init, 1137 # so we need to have self.core_mask set before changing IRQ affinity. 1138 self.core_mask = target_config["core_mask"] 1139 self.num_cores = len(self.get_core_list_from_mask(self.core_mask)) 1140 1141 super().__init__(name, general_config, target_config) 1142 1143 # Format: property, default value 1144 config_fields = [ 1145 ConfigField(name='dif_insert_strip', default=False), 1146 ConfigField(name='null_block_dif_type', default=0), 1147 ConfigField(name='num_shared_buffers', default=4096), 1148 ConfigField(name='max_queue_depth', default=128), 1149 ConfigField(name='bpf_scripts', default=[]), 1150 ConfigField(name='scheduler_core_limit', default=None), 1151 ConfigField(name='dsa_settings', default=False), 1152 ConfigField(name='iobuf_small_pool_count', default=32767), 1153 ConfigField(name='iobuf_large_pool_count', default=16383), 1154 ConfigField(name='num_cqe', default=4096), 1155 ConfigField(name='sock_impl', default='posix') 1156 ] 1157 1158 self.read_config(config_fields, target_config) 1159 1160 self.bpf_proc = None 1161 self.enable_dsa = False 1162 1163 self.log.info("====DSA settings:====") 1164 self.log.info("DSA enabled: %s" % (self.enable_dsa)) 1165 1166 def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False): 1167 if mode not in ["default", "bynode", "cpulist", 1168 "shared", "split", "split-bynode"]: 1169 self.log.error("%s irq affinity setting not supported" % mode) 1170 raise Exception 1171 1172 # Create core list from SPDK's mask and change it to string. 1173 # This is the type configure_irq_affinity expects for cpulist parameter. 1174 spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask) 1175 spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list)) 1176 spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]" 1177 1178 if mode == "shared": 1179 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list) 1180 elif mode == "split": 1181 super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1182 elif mode == "split-bynode": 1183 super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True) 1184 else: 1185 super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist) 1186 1187 def adq_set_busy_read(self, busy_read_val): 1188 return {"net.core.busy_read": busy_read_val} 1189 1190 def get_nvme_devices_count(self): 1191 return len(self.get_nvme_devices()) 1192 1193 def get_nvme_devices(self): 1194 bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")])) 1195 bdev_bdfs = [] 1196 for bdev in bdev_subsys_json_obj["config"]: 1197 bdev_traddr = bdev["params"]["traddr"] 1198 if bdev_traddr in self.nvme_blocklist: 1199 continue 1200 if len(self.nvme_allowlist) == 0: 1201 bdev_bdfs.append(bdev_traddr) 1202 if bdev_traddr in self.nvme_allowlist: 1203 bdev_bdfs.append(bdev_traddr) 1204 return bdev_bdfs 1205 1206 def spdk_tgt_configure(self): 1207 self.log.info("Configuring SPDK NVMeOF target via RPC") 1208 1209 # Create transport layer 1210 nvmf_transport_params = { 1211 "client": self.client, 1212 "trtype": self.transport, 1213 "num_shared_buffers": self.num_shared_buffers, 1214 "max_queue_depth": self.max_queue_depth, 1215 "dif_insert_or_strip": self.dif_insert_strip, 1216 "sock_priority": self.adq_priority, 1217 "num_cqe": self.num_cqe 1218 } 1219 1220 if self.enable_adq: 1221 nvmf_transport_params["acceptor_poll_rate"] = 10000 1222 1223 rpc.nvmf.nvmf_create_transport(**nvmf_transport_params) 1224 self.log.info("SPDK NVMeOF transport layer:") 1225 rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client)) 1226 1227 if self.null_block: 1228 self.spdk_tgt_add_nullblock(self.null_block) 1229 self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block) 1230 else: 1231 self.spdk_tgt_add_nvme_conf() 1232 self.spdk_tgt_add_subsystem_conf(self.nic_ips) 1233 1234 if self.enable_adq: 1235 self.adq_configure_tc() 1236 1237 self.log.info("Done configuring SPDK NVMeOF Target") 1238 1239 def spdk_tgt_add_nullblock(self, null_block_count): 1240 md_size = 0 1241 block_size = 4096 1242 if self.null_block_dif_type != 0: 1243 md_size = 128 1244 1245 self.log.info("Adding null block bdevices to config via RPC") 1246 for i in range(null_block_count): 1247 self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type) 1248 rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i), 1249 dif_type=self.null_block_dif_type, md_size=md_size) 1250 self.log.info("SPDK Bdevs configuration:") 1251 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1252 1253 def spdk_tgt_add_nvme_conf(self, req_num_disks=None): 1254 self.log.info("Adding NVMe bdevs to config via RPC") 1255 1256 bdfs = self.get_nvme_devices() 1257 bdfs = [b.replace(":", ".") for b in bdfs] 1258 1259 if req_num_disks: 1260 if req_num_disks > len(bdfs): 1261 self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs)) 1262 sys.exit(1) 1263 else: 1264 bdfs = bdfs[0:req_num_disks] 1265 1266 for i, bdf in enumerate(bdfs): 1267 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf) 1268 1269 self.log.info("SPDK Bdevs configuration:") 1270 rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client)) 1271 1272 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None): 1273 self.log.info("Adding subsystems to config") 1274 if not req_num_disks: 1275 req_num_disks = self.get_nvme_devices_count() 1276 1277 for ip, bdev_num in self.spread_bdevs(req_num_disks): 1278 port = str(4420 + bdev_num) 1279 nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num 1280 serial = "SPDK00%s" % bdev_num 1281 bdev_name = "Nvme%sn1" % bdev_num 1282 1283 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial, 1284 allow_any_host=True, max_namespaces=8) 1285 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name) 1286 for nqn_name in [nqn, "discovery"]: 1287 rpc.nvmf.nvmf_subsystem_add_listener(self.client, 1288 nqn=nqn_name, 1289 trtype=self.transport, 1290 traddr=ip, 1291 trsvcid=port, 1292 adrfam="ipv4") 1293 self.subsystem_info_list.append((port, nqn, ip)) 1294 self.subsys_no = len(self.subsystem_info_list) 1295 1296 self.log.info("SPDK NVMeOF subsystem configuration:") 1297 rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client)) 1298 1299 def bpf_start(self): 1300 self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts) 1301 bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh") 1302 bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts] 1303 results_path = os.path.join(self.results_dir, "bpf_traces.txt") 1304 1305 with open(self.pid, "r") as fh: 1306 nvmf_pid = str(fh.readline()) 1307 1308 cmd = [bpf_script, nvmf_pid, *bpf_traces] 1309 self.log.info(cmd) 1310 self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path}) 1311 1312 def tgt_start(self): 1313 if self.null_block: 1314 self.subsys_no = 1 1315 else: 1316 self.subsys_no = self.get_nvme_devices_count() 1317 self.log.info("Starting SPDK NVMeOF Target process") 1318 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt") 1319 proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask]) 1320 self.pid = os.path.join(self.spdk_dir, "nvmf.pid") 1321 1322 with open(self.pid, "w") as fh: 1323 fh.write(str(proc.pid)) 1324 self.nvmf_proc = proc 1325 self.log.info("SPDK NVMeOF Target PID=%s" % self.pid) 1326 self.log.info("Waiting for spdk to initialize...") 1327 while True: 1328 if os.path.exists("/var/tmp/spdk.sock"): 1329 break 1330 time.sleep(1) 1331 self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock") 1332 1333 rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl) 1334 rpc.iobuf.iobuf_set_options(self.client, 1335 small_pool_count=self.iobuf_small_pool_count, 1336 large_pool_count=self.iobuf_large_pool_count, 1337 small_bufsize=None, 1338 large_bufsize=None) 1339 1340 if self.enable_zcopy: 1341 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, 1342 enable_zerocopy_send_server=True) 1343 self.log.info("Target socket options:") 1344 rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl)) 1345 1346 if self.enable_adq: 1347 rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1) 1348 rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None, 1349 nvme_adminq_poll_period_us=100000, retry_count=4) 1350 1351 if self.enable_dsa: 1352 rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None) 1353 self.log.info("Target DSA accel module enabled") 1354 1355 rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit) 1356 rpc.framework_start_init(self.client) 1357 1358 if self.bpf_scripts: 1359 self.bpf_start() 1360 1361 self.spdk_tgt_configure() 1362 1363 def stop(self): 1364 if self.bpf_proc: 1365 self.log.info("Stopping BPF Trace script") 1366 self.bpf_proc.terminate() 1367 self.bpf_proc.wait() 1368 1369 if hasattr(self, "nvmf_proc"): 1370 try: 1371 self.nvmf_proc.terminate() 1372 self.nvmf_proc.wait(timeout=30) 1373 except Exception as e: 1374 self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.") 1375 self.log.info(e) 1376 self.nvmf_proc.kill() 1377 self.nvmf_proc.communicate() 1378 # Try to clean up RPC socket files if they were not removed 1379 # because of using 'kill' 1380 try: 1381 os.remove("/var/tmp/spdk.sock") 1382 os.remove("/var/tmp/spdk.sock.lock") 1383 except FileNotFoundError: 1384 pass 1385 self.restore_settings() 1386 1387 1388class KernelInitiator(Initiator): 1389 def __init__(self, name, general_config, initiator_config): 1390 super().__init__(name, general_config, initiator_config) 1391 1392 # Defaults 1393 self.extra_params = initiator_config.get('extra_params', '') 1394 1395 self.ioengine = "libaio" 1396 self.spdk_conf = "" 1397 1398 if "num_cores" in initiator_config: 1399 self.num_cores = initiator_config["num_cores"] 1400 1401 if "kernel_engine" in initiator_config: 1402 self.ioengine = initiator_config["kernel_engine"] 1403 if "io_uring" in self.ioengine: 1404 self.extra_params += ' --nr-poll-queues=8' 1405 1406 def configure_adq(self): 1407 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1408 1409 def adq_configure_tc(self): 1410 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.") 1411 1412 def adq_set_busy_read(self, busy_read_val): 1413 self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0") 1414 return {"net.core.busy_read": 0} 1415 1416 def get_connected_nvme_list(self): 1417 json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"])) 1418 nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"] 1419 if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]] 1420 return nvme_list 1421 1422 def init_connect(self): 1423 self.log.info("Below connection attempts may result in error messages, this is expected!") 1424 for subsystem in self.subsystem_info_list: 1425 self.log.info("Trying to connect %s %s %s" % subsystem) 1426 self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport, 1427 "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params]) 1428 time.sleep(2) 1429 1430 if "io_uring" in self.ioengine: 1431 self.log.info("Setting block layer settings for io_uring.") 1432 1433 # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because 1434 # apparently it's not possible for connected subsystems. 1435 # Results in "error: Invalid argument" 1436 block_sysfs_settings = { 1437 "iostats": "0", 1438 "rq_affinity": "0", 1439 "nomerges": "2" 1440 } 1441 1442 for disk in self.get_connected_nvme_list(): 1443 sysfs = os.path.join("/sys/block", disk, "queue") 1444 for k, v in block_sysfs_settings.items(): 1445 sysfs_opt_path = os.path.join(sysfs, k) 1446 try: 1447 self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True) 1448 except CalledProcessError as e: 1449 self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v)) 1450 finally: 1451 _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)]) 1452 self.log.info("%s=%s" % (sysfs_opt_path, _)) 1453 1454 def init_disconnect(self): 1455 for subsystem in self.subsystem_info_list: 1456 self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]]) 1457 time.sleep(1) 1458 1459 def get_nvme_subsystem_numa(self, dev_name): 1460 # Remove two last characters to get controller name instead of subsystem name 1461 nvme_ctrl = os.path.basename(dev_name)[:-2] 1462 remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', 1463 self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl])) 1464 return self.get_route_nic_numa(remote_nvme_ip.group(0)) 1465 1466 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1467 self.available_cpus = self.get_numa_cpu_map() 1468 if len(threads) >= len(subsystems): 1469 threads = range(0, len(subsystems)) 1470 1471 # Generate connected nvme devices names and sort them by used NIC numa node 1472 # to allow better grouping when splitting into fio sections. 1473 nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()] 1474 nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list] 1475 nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))] 1476 1477 filename_section = "" 1478 nvme_per_split = int(len(nvme_list) / len(threads)) 1479 remainder = len(nvme_list) % len(threads) 1480 iterator = iter(nvme_list) 1481 result = [] 1482 for i in range(len(threads)): 1483 result.append([]) 1484 for _ in range(nvme_per_split): 1485 result[i].append(next(iterator)) 1486 if remainder: 1487 result[i].append(next(iterator)) 1488 remainder -= 1 1489 for i, r in enumerate(result): 1490 header = "[filename%s]" % i 1491 disks = "\n".join(["filename=%s" % x for x in r]) 1492 job_section_qd = round((io_depth * len(r)) / num_jobs) 1493 if job_section_qd == 0: 1494 job_section_qd = 1 1495 iodepth = "iodepth=%s" % job_section_qd 1496 1497 offset_section = "" 1498 if offset: 1499 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1500 1501 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1502 1503 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1504 1505 return filename_section 1506 1507 1508class SPDKInitiator(Initiator): 1509 def __init__(self, name, general_config, initiator_config): 1510 super().__init__(name, general_config, initiator_config) 1511 1512 if self.skip_spdk_install is False: 1513 self.install_spdk() 1514 1515 # Optional fields 1516 self.enable_data_digest = initiator_config.get('enable_data_digest', False) 1517 self.small_pool_count = initiator_config.get('small_pool_count', 32768) 1518 self.large_pool_count = initiator_config.get('large_pool_count', 16384) 1519 self.sock_impl = initiator_config.get('sock_impl', 'posix') 1520 1521 if "num_cores" in initiator_config: 1522 self.num_cores = initiator_config["num_cores"] 1523 1524 self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir 1525 self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir 1526 1527 def adq_set_busy_read(self, busy_read_val): 1528 return {"net.core.busy_read": busy_read_val} 1529 1530 def install_spdk(self): 1531 self.log.info("Using fio binary %s" % self.fio_bin) 1532 self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"]) 1533 self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"]) 1534 self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", 1535 "--with-fio=%s" % os.path.dirname(self.fio_bin), 1536 "--enable-lto", "--disable-unit-tests"]) 1537 self.exec_cmd(["make", "-C", self.spdk_dir, "clean"]) 1538 self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"]) 1539 1540 self.log.info("SPDK built") 1541 self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir]) 1542 1543 def init_connect(self): 1544 # Not a real "connect" like when doing "nvme connect" because SPDK's fio 1545 # bdev plugin initiates connection just before starting IO traffic. 1546 # This is just to have a "init_connect" equivalent of the same function 1547 # from KernelInitiator class. 1548 # Just prepare bdev.conf JSON file for later use and consider it 1549 # "making a connection". 1550 bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list) 1551 self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir]) 1552 1553 def init_disconnect(self): 1554 # SPDK Initiator does not need to explicity disconnect as this gets done 1555 # after fio bdev plugin finishes IO. 1556 return 1557 1558 def gen_spdk_bdev_conf(self, remote_subsystem_list): 1559 spdk_cfg_section = { 1560 "subsystems": [ 1561 { 1562 "subsystem": "bdev", 1563 "config": [] 1564 }, 1565 { 1566 "subsystem": "iobuf", 1567 "config": [ 1568 { 1569 "method": "iobuf_set_options", 1570 "params": { 1571 "small_pool_count": self.small_pool_count, 1572 "large_pool_count": self.large_pool_count 1573 } 1574 } 1575 ] 1576 }, 1577 { 1578 "subsystem": "sock", 1579 "config": [ 1580 { 1581 "method": "sock_set_default_impl", 1582 "params": { 1583 "impl_name": self.sock_impl 1584 } 1585 } 1586 ] 1587 } 1588 ] 1589 } 1590 1591 for i, subsys in enumerate(remote_subsystem_list): 1592 sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys) 1593 nvme_ctrl = { 1594 "method": "bdev_nvme_attach_controller", 1595 "params": { 1596 "name": "Nvme{}".format(i), 1597 "trtype": self.transport, 1598 "traddr": sub_addr, 1599 "trsvcid": sub_port, 1600 "subnqn": sub_nqn, 1601 "adrfam": "IPv4" 1602 } 1603 } 1604 1605 if self.enable_adq: 1606 nvme_ctrl["params"].update({"priority": "1"}) 1607 1608 if self.enable_data_digest: 1609 nvme_ctrl["params"].update({"ddgst": self.enable_data_digest}) 1610 1611 spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl) 1612 1613 return json.dumps(spdk_cfg_section, indent=2) 1614 1615 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0): 1616 self.available_cpus = self.get_numa_cpu_map() 1617 filename_section = "" 1618 if len(threads) >= len(subsystems): 1619 threads = range(0, len(subsystems)) 1620 1621 # Generate expected NVMe Bdev names and sort them by used NIC numa node 1622 # to allow better grouping when splitting into fio sections. 1623 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))] 1624 filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames] 1625 filenames = [x for _, x in sorted(zip(filename_numas, filenames))] 1626 1627 nvme_per_split = int(len(subsystems) / len(threads)) 1628 remainder = len(subsystems) % len(threads) 1629 iterator = iter(filenames) 1630 result = [] 1631 for i in range(len(threads)): 1632 result.append([]) 1633 for _ in range(nvme_per_split): 1634 result[i].append(next(iterator)) 1635 if remainder: 1636 result[i].append(next(iterator)) 1637 remainder -= 1 1638 for i, r in enumerate(result): 1639 header = "[filename%s]" % i 1640 disks = "\n".join(["filename=%s" % x for x in r]) 1641 job_section_qd = round((io_depth * len(r)) / num_jobs) 1642 if job_section_qd == 0: 1643 job_section_qd = 1 1644 iodepth = "iodepth=%s" % job_section_qd 1645 1646 offset_section = "" 1647 if offset: 1648 offset_section = self.gen_fio_offset_section(offset_inc, num_jobs) 1649 1650 numa_opts = self.gen_fio_numa_section(r, num_jobs) 1651 1652 filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""]) 1653 1654 return filename_section 1655 1656 def get_nvme_subsystem_numa(self, bdev_name): 1657 bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir])) 1658 bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"] 1659 1660 # Remove two last characters to get controller name instead of subsystem name 1661 nvme_ctrl = bdev_name[:-2] 1662 for bdev in bdev_conf_json_obj: 1663 if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl: 1664 return self.get_route_nic_numa(bdev["params"]["traddr"]) 1665 return None 1666 1667 1668def initiators_match_subsystems(initiators, target_obj): 1669 for i in initiators: 1670 i.match_subsystems(target_obj.subsystem_info_list) 1671 if i.enable_adq: 1672 i.adq_configure_tc() 1673 1674 1675def run_single_fio_test(args, 1676 initiators, 1677 configs, 1678 target_obj, 1679 block_size, 1680 io_depth, 1681 rw, 1682 fio_rw_mix_read, 1683 fio_run_num, 1684 fio_ramp_time, 1685 fio_run_time): 1686 1687 for run_no in range(1, fio_run_num+1): 1688 threads = [] 1689 power_daemon = None 1690 measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no) 1691 1692 for i, cfg in zip(initiators, configs): 1693 t = threading.Thread(target=i.run_fio, args=(cfg, run_no)) 1694 threads.append(t) 1695 if target_obj.enable_sar: 1696 sar_file_prefix = measurements_prefix + "_sar" 1697 t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time)) 1698 threads.append(t) 1699 1700 if target_obj.enable_pcm: 1701 pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]] 1702 pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, 1703 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time)) 1704 threads.append(pcm_cpu_t) 1705 1706 if target_obj.enable_bw: 1707 bandwidth_file_name = measurements_prefix + "_bandwidth.csv" 1708 t = threading.Thread(target=target_obj.measure_network_bandwidth, 1709 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time)) 1710 threads.append(t) 1711 1712 if target_obj.enable_dpdk_memory: 1713 dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt" 1714 t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time)) 1715 threads.append(t) 1716 1717 if target_obj.enable_pm: 1718 power_daemon = threading.Thread(target=target_obj.measure_power, 1719 args=(args.results, measurements_prefix, script_full_dir, 1720 fio_ramp_time, fio_run_time)) 1721 threads.append(power_daemon) 1722 1723 for t in threads: 1724 t.start() 1725 for t in threads: 1726 t.join() 1727 1728 1729def run_fio_tests(args, initiators, target_obj, 1730 fio_workloads, fio_rw_mix_read, 1731 fio_run_num, fio_ramp_time, fio_run_time, 1732 fio_rate_iops, fio_offset, fio_offset_inc): 1733 1734 for block_size, io_depth, rw in fio_workloads: 1735 configs = [] 1736 for i in initiators: 1737 i.init_connect() 1738 cfg = i.gen_fio_config(rw, 1739 fio_rw_mix_read, 1740 block_size, 1741 io_depth, 1742 target_obj.subsys_no, 1743 fio_num_jobs, 1744 fio_ramp_time, 1745 fio_run_time, 1746 fio_rate_iops, 1747 fio_offset, 1748 fio_offset_inc) 1749 configs.append(cfg) 1750 1751 run_single_fio_test(args, 1752 initiators, 1753 configs, 1754 target_obj, 1755 block_size, 1756 io_depth, 1757 rw, 1758 fio_rw_mix_read, 1759 fio_run_num, 1760 fio_ramp_time, 1761 fio_run_time) 1762 1763 for i in initiators: 1764 i.init_disconnect() 1765 i.copy_result_files(args.results) 1766 1767 try: 1768 parse_results(args.results, args.csv_filename) 1769 except Exception as err: 1770 logging.error("There was an error with parsing the results") 1771 raise err 1772 1773 1774if __name__ == "__main__": 1775 exit_code = 0 1776 1777 script_full_dir = os.path.dirname(os.path.realpath(__file__)) 1778 default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json")) 1779 1780 parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) 1781 parser.add_argument('-c', '--config', type=str, default=default_config_file_path, 1782 help='Configuration file.') 1783 parser.add_argument('-r', '--results', type=str, default='/tmp/results', 1784 help='Results directory.') 1785 parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv', 1786 help='CSV results filename.') 1787 parser.add_argument('-f', '--force', default=False, action='store_true', 1788 dest='force', help="""Force script to continue and try to use all 1789 available NVMe devices during test. 1790 WARNING: Might result in data loss on used NVMe drives""") 1791 1792 args = parser.parse_args() 1793 1794 logging.basicConfig(level=logging.INFO, 1795 format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s') 1796 1797 logging.info("Using config file: %s" % args.config) 1798 with open(args.config, "r") as config: 1799 data = json.load(config) 1800 1801 initiators = [] 1802 general_config = data["general"] 1803 target_config = data["target"] 1804 initiator_configs = [data[x] for x in data.keys() if "initiator" in x] 1805 1806 if "null_block_devices" not in data["target"] and \ 1807 (args.force is False and 1808 "allowlist" not in data["target"] and 1809 "blocklist" not in data["target"]): 1810 # TODO: Also check if allowlist or blocklist are not empty. 1811 logging.warning("""WARNING: This script requires allowlist and blocklist to be defined. 1812 You can choose to use all available NVMe drives on your system, which may potentially 1813 lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""") 1814 exit(1) 1815 1816 for k, v in data.items(): 1817 if "target" in k: 1818 v.update({"results_dir": args.results}) 1819 if data[k]["mode"] == "spdk": 1820 target_obj = SPDKTarget(k, data["general"], v) 1821 elif data[k]["mode"] == "kernel": 1822 target_obj = KernelTarget(k, data["general"], v) 1823 elif "initiator" in k: 1824 if data[k]["mode"] == "spdk": 1825 init_obj = SPDKInitiator(k, data["general"], v) 1826 elif data[k]["mode"] == "kernel": 1827 init_obj = KernelInitiator(k, data["general"], v) 1828 initiators.append(init_obj) 1829 elif "fio" in k: 1830 fio_workloads = itertools.product(data[k]["bs"], 1831 data[k]["qd"], 1832 data[k]["rw"]) 1833 1834 fio_run_time = data[k]["run_time"] 1835 fio_ramp_time = data[k]["ramp_time"] 1836 fio_rw_mix_read = data[k]["rwmixread"] 1837 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None 1838 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None 1839 1840 fio_rate_iops = 0 1841 if "rate_iops" in data[k]: 1842 fio_rate_iops = data[k]["rate_iops"] 1843 1844 fio_offset = False 1845 if "offset" in data[k]: 1846 fio_offset = data[k]["offset"] 1847 fio_offset_inc = 0 1848 if "offset_inc" in data[k]: 1849 fio_offset_inc = data[k]["offset_inc"] 1850 else: 1851 continue 1852 1853 try: 1854 os.mkdir(args.results) 1855 except FileExistsError: 1856 pass 1857 1858 for i in initiators: 1859 target_obj.initiator_info.append( 1860 {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips} 1861 ) 1862 1863 try: 1864 target_obj.tgt_start() 1865 initiators_match_subsystems(initiators, target_obj) 1866 1867 # Poor mans threading 1868 # Run FIO tests 1869 run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read, 1870 fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops, 1871 fio_offset, fio_offset_inc) 1872 1873 except Exception as e: 1874 logging.error("Exception occurred while running FIO tests") 1875 logging.error(e) 1876 exit_code = 1 1877 1878 finally: 1879 for i in initiators: 1880 try: 1881 i.stop() 1882 except Exception as err: 1883 pass 1884 target_obj.stop() 1885 sys.exit(exit_code) 1886