xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 1b1967bdd61daa5ec110e2ca6c73c7b38a60bb89)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25from abc import abstractmethod, ABC
26
27sys.path.append(os.path.dirname(__file__) + '/../../../python')
28
29import spdk.rpc as rpc  # noqa
30import spdk.rpc.client as rpc_client  # noqa
31
32
33class Server(ABC):
34    def __init__(self, name, general_config, server_config):
35        self.name = name
36        self.username = general_config["username"]
37        self.password = general_config["password"]
38        self.transport = general_config["transport"].lower()
39        self.skip_spdk_install = general_config.get('skip_spdk_install', False)
40        self.nic_ips = server_config["nic_ips"]
41        self.mode = server_config["mode"]
42        self.irdma_roce_enable = False
43
44        self.log = logging.getLogger(self.name)
45
46        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
47        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
48            self.irq_scripts_dir = server_config["irq_scripts_dir"]
49
50        self.local_nic_info = []
51        self._nics_json_obj = {}
52        self.svc_restore_dict = {}
53        self.sysctl_restore_dict = {}
54        self.tuned_restore_dict = {}
55        self.governor_restore = ""
56        self.tuned_profile = ""
57
58        self.enable_adq = False
59        self.adq_priority = None
60        self.enable_arfs = server_config.get("enable_arfs", False)
61        self.irq_settings = {"mode": "default"}
62
63        if "adq_enable" in server_config and server_config["adq_enable"]:
64            self.enable_adq = server_config["adq_enable"]
65            self.adq_priority = 1
66        if "irq_settings" in server_config:
67            self.irq_settings.update(server_config["irq_settings"])
68        if "tuned_profile" in server_config:
69            self.tuned_profile = server_config["tuned_profile"]
70        if "irdma_roce_enable" in general_config:
71            self.irdma_roce_enable = general_config["irdma_roce_enable"]
72
73        if not re.match(r'^[A-Za-z0-9\-]+$', name):
74            self.log.info("Please use a name which contains only letters, numbers or dashes")
75            sys.exit(1)
76
77    @staticmethod
78    def get_uncommented_lines(lines):
79        return [line for line in lines if line and not line.startswith('#')]
80
81    def get_nic_name_by_ip(self, ip):
82        if not self._nics_json_obj:
83            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
84            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
85        for nic in self._nics_json_obj:
86            for addr in nic["addr_info"]:
87                if ip in addr["local"]:
88                    return nic["ifname"]
89
90    @abstractmethod
91    def set_local_nic_info_helper(self):
92        pass
93
94    def set_local_nic_info(self, pci_info):
95        def extract_network_elements(json_obj):
96            nic_list = []
97            if isinstance(json_obj, list):
98                for x in json_obj:
99                    nic_list.extend(extract_network_elements(x))
100            elif isinstance(json_obj, dict):
101                if "children" in json_obj:
102                    nic_list.extend(extract_network_elements(json_obj["children"]))
103                if "class" in json_obj.keys() and "network" in json_obj["class"]:
104                    nic_list.append(json_obj)
105            return nic_list
106
107        self.local_nic_info = extract_network_elements(pci_info)
108
109    def get_nic_numa_node(self, nic_name):
110        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
111
112    def get_numa_cpu_map(self):
113        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
114        numa_cpu_json_map = {}
115
116        for cpu in numa_cpu_json_obj["cpus"]:
117            cpu_num = int(cpu["cpu"])
118            numa_node = int(cpu["node"])
119            numa_cpu_json_map.setdefault(numa_node, [])
120            numa_cpu_json_map[numa_node].append(cpu_num)
121
122        return numa_cpu_json_map
123
124    # pylint: disable=R0201
125    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
126        return ""
127
128    def configure_system(self):
129        self.load_drivers()
130        self.configure_services()
131        self.configure_sysctl()
132        self.configure_arfs()
133        self.configure_tuned()
134        self.configure_cpu_governor()
135        self.configure_irq_affinity(**self.irq_settings)
136
137    RDMA_PROTOCOL_IWARP = 0
138    RDMA_PROTOCOL_ROCE = 1
139    RDMA_PROTOCOL_UNKNOWN = -1
140
141    def check_rdma_protocol(self):
142        try:
143            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
144            roce_ena = roce_ena.strip()
145            if roce_ena == "0":
146                return self.RDMA_PROTOCOL_IWARP
147            else:
148                return self.RDMA_PROTOCOL_ROCE
149        except CalledProcessError as e:
150            self.log.error("ERROR: failed to check RDMA protocol!")
151            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
152            return self.RDMA_PROTOCOL_UNKNOWN
153
154    def load_drivers(self):
155        self.log.info("Loading drivers")
156        self.exec_cmd(["sudo", "modprobe", "-a",
157                       "nvme-%s" % self.transport,
158                       "nvmet-%s" % self.transport])
159        current_mode = self.check_rdma_protocol()
160        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
161            self.log.error("ERROR: failed to check RDMA protocol mode")
162            return
163        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
164            self.reload_driver("irdma", "roce_ena=1")
165            self.log.info("Loaded irdma driver with RoCE enabled")
166        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
167            self.log.info("Leaving irdma driver with RoCE enabled")
168        else:
169            self.reload_driver("irdma", "roce_ena=0")
170            self.log.info("Loaded irdma driver with iWARP enabled")
171
172    def configure_arfs(self):
173        rps_flow_cnt = 512
174        if not self.enable_arfs:
175            rps_flow_cnt = 0
176
177        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
178        for nic_name in nic_names:
179            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
180            self.log.info(f"Setting rps_flow_cnt for {nic_name}")
181            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
182            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
183
184            for qf in queue_files:
185                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
186
187    def configure_adq(self):
188        self.adq_load_modules()
189        self.adq_configure_nic()
190
191    def adq_load_modules(self):
192        self.log.info("Modprobing ADQ-related Linux modules...")
193        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
194        for module in adq_module_deps:
195            try:
196                self.exec_cmd(["sudo", "modprobe", module])
197                self.log.info("%s loaded!" % module)
198            except CalledProcessError as e:
199                self.log.error("ERROR: failed to load module %s" % module)
200                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
201
202    def adq_configure_tc(self):
203        self.log.info("Configuring ADQ Traffic classes and filters...")
204
205        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
206        num_queues_tc1 = self.num_cores
207        port_param = "dst_port" if isinstance(self, Target) else "src_port"
208        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
209
210        for nic_ip in self.nic_ips:
211            nic_name = self.get_nic_name_by_ip(nic_ip)
212            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
213
214            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
215                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
216                                "queues", "%s@0" % num_queues_tc0,
217                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
218                                "hw", "1", "mode", "channel"]
219            self.log.info(" ".join(tc_qdisc_map_cmd))
220            self.exec_cmd(tc_qdisc_map_cmd)
221
222            time.sleep(5)
223            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
224            self.log.info(" ".join(tc_qdisc_ingress_cmd))
225            self.exec_cmd(tc_qdisc_ingress_cmd)
226
227            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
228            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
229                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
230
231            for port in nic_ports:
232                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
233                                 "protocol", "ip", "ingress", "prio", "1", "flower",
234                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
235                                 "skip_sw", "hw_tc", "1"]
236                self.log.info(" ".join(tc_filter_cmd))
237                self.exec_cmd(tc_filter_cmd)
238
239            # show tc configuration
240            self.log.info("Show tc configuration for %s NIC..." % nic_name)
241            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
242            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
243            self.log.info("%s" % tc_disk_out)
244            self.log.info("%s" % tc_filter_out)
245
246            # Ethtool coalesce settings must be applied after configuring traffic classes
247            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
248            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
249
250            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
251            xps_cmd = ["sudo", xps_script_path, nic_name]
252            self.log.info(xps_cmd)
253            self.exec_cmd(xps_cmd)
254
255    def reload_driver(self, driver, *modprobe_args):
256
257        try:
258            self.exec_cmd(["sudo", "rmmod", driver])
259            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
260        except CalledProcessError as e:
261            self.log.error("ERROR: failed to reload %s module!" % driver)
262            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
263
264    def adq_configure_nic(self):
265        self.log.info("Configuring NIC port settings for ADQ testing...")
266
267        # Reload the driver first, to make sure any previous settings are re-set.
268        self.reload_driver("ice")
269
270        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
271        for nic in nic_names:
272            self.log.info(nic)
273            try:
274                self.exec_cmd(["sudo", "ethtool", "-K", nic,
275                               "hw-tc-offload", "on"])  # Enable hardware TC offload
276                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
277                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
278                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
279                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
280                # but can be used with 4k block sizes as well
281                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
282                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
283            except subprocess.CalledProcessError as e:
284                self.log.error("ERROR: failed to configure NIC port using ethtool!")
285                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
286                self.log.info("Please update your NIC driver and firmware versions and try again.")
287            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
288            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
289
290    def configure_services(self):
291        self.log.info("Configuring active services...")
292        svc_config = configparser.ConfigParser(strict=False)
293
294        # Below list is valid only for RHEL / Fedora systems and might not
295        # contain valid names for other distributions.
296        svc_target_state = {
297            "firewalld": "inactive",
298            "irqbalance": "inactive",
299            "lldpad.service": "inactive",
300            "lldpad.socket": "inactive"
301        }
302
303        for service in svc_target_state:
304            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
305            out = "\n".join(["[%s]" % service, out])
306            svc_config.read_string(out)
307
308            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
309                continue
310
311            service_state = svc_config[service]["ActiveState"]
312            self.log.info("Current state of %s service is %s" % (service, service_state))
313            self.svc_restore_dict.update({service: service_state})
314            if service_state != "inactive":
315                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
316                self.exec_cmd(["sudo", "systemctl", "stop", service])
317
318    def configure_sysctl(self):
319        self.log.info("Tuning sysctl settings...")
320
321        busy_read = 0
322        busy_poll = 0
323        if self.enable_adq and self.mode == "spdk":
324            busy_read = 1
325            busy_poll = 1
326
327        sysctl_opts = {
328            "net.core.busy_poll": busy_poll,
329            "net.core.busy_read": busy_read,
330            "net.core.somaxconn": 4096,
331            "net.core.netdev_max_backlog": 8192,
332            "net.ipv4.tcp_max_syn_backlog": 16384,
333            "net.core.rmem_max": 268435456,
334            "net.core.wmem_max": 268435456,
335            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
336            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
337            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
338            "net.ipv4.route.flush": 1,
339            "vm.overcommit_memory": 1,
340            "net.core.rps_sock_flow_entries": 0
341        }
342
343        if self.enable_adq:
344            sysctl_opts.update(self.adq_set_busy_read(1))
345
346        if self.enable_arfs:
347            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
348
349        for opt, value in sysctl_opts.items():
350            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
351            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
352
353    def configure_tuned(self):
354        if not self.tuned_profile:
355            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
356            return
357
358        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
359        service = "tuned"
360        tuned_config = configparser.ConfigParser(strict=False)
361
362        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
363        out = "\n".join(["[%s]" % service, out])
364        tuned_config.read_string(out)
365        tuned_state = tuned_config[service]["ActiveState"]
366        self.svc_restore_dict.update({service: tuned_state})
367
368        if tuned_state != "inactive":
369            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
370            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
371
372            self.tuned_restore_dict = {
373                "profile": profile,
374                "mode": profile_mode
375            }
376
377        self.exec_cmd(["sudo", "systemctl", "start", service])
378        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
379        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
380
381    def configure_cpu_governor(self):
382        self.log.info("Setting CPU governor to performance...")
383
384        # This assumes that there is the same CPU scaling governor on each CPU
385        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
386        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
387
388    def get_core_list_from_mask(self, core_mask):
389        # Generate list of individual cores from hex core mask
390        # (e.g. '0xffff') or list containing:
391        # - individual cores (e.g. '1, 2, 3')
392        # - core ranges (e.g. '0-3')
393        # - mix of both (e.g. '0, 1-4, 9, 11-13')
394
395        core_list = []
396        if "0x" in core_mask:
397            core_mask_int = int(core_mask, 16)
398            for i in range(core_mask_int.bit_length()):
399                if (1 << i) & core_mask_int:
400                    core_list.append(i)
401            return core_list
402        else:
403            # Core list can be provided in .json config with square brackets
404            # remove them first
405            core_mask = core_mask.replace("[", "")
406            core_mask = core_mask.replace("]", "")
407
408            for i in core_mask.split(","):
409                if "-" in i:
410                    start, end = i.split("-")
411                    core_range = range(int(start), int(end) + 1)
412                    core_list.extend(core_range)
413                else:
414                    core_list.append(int(i))
415            return core_list
416
417    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
418        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
419
420        if mode not in ["default", "bynode", "cpulist"]:
421            raise ValueError("%s irq affinity setting not supported" % mode)
422
423        if mode == "cpulist" and not cpulist:
424            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
425
426        affinity_script = "set_irq_affinity.sh"
427        if "default" not in mode:
428            affinity_script = "set_irq_affinity_cpulist.sh"
429            system_cpu_map = self.get_numa_cpu_map()
430        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
431
432        def cpu_list_to_string(cpulist):
433            return ",".join(map(lambda x: str(x), cpulist))
434
435        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
436        for nic_name in nic_names:
437            irq_cmd = ["sudo", irq_script_path]
438
439            # Use only CPU cores matching NIC NUMA node.
440            # Remove any CPU cores if they're on exclusion list.
441            if mode == "bynode":
442                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
443                if cpulist and exclude_cpulist:
444                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
445                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
446                    if not irq_cpus:
447                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
448                irq_cmd.append(cpu_list_to_string(irq_cpus))
449
450            if mode == "cpulist":
451                irq_cpus = self.get_core_list_from_mask(cpulist)
452                if exclude_cpulist:
453                    # Flatten system CPU list, we don't need NUMA awareness here
454                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
455                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
456                    if not irq_cpus:
457                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
458                irq_cmd.append(cpu_list_to_string(irq_cpus))
459
460            irq_cmd.append(nic_name)
461            self.log.info(irq_cmd)
462            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
463
464    def restore_services(self):
465        self.log.info("Restoring services...")
466        for service, state in self.svc_restore_dict.items():
467            cmd = "stop" if state == "inactive" else "start"
468            self.exec_cmd(["sudo", "systemctl", cmd, service])
469
470    def restore_sysctl(self):
471        self.log.info("Restoring sysctl settings...")
472        for opt, value in self.sysctl_restore_dict.items():
473            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
474
475    def restore_tuned(self):
476        self.log.info("Restoring tuned-adm settings...")
477
478        if not self.tuned_restore_dict:
479            return
480
481        if self.tuned_restore_dict["mode"] == "auto":
482            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
483            self.log.info("Reverted tuned-adm to auto_profile.")
484        else:
485            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
486            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
487
488    def restore_governor(self):
489        self.log.info("Restoring CPU governor setting...")
490        if self.governor_restore:
491            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
492            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
493
494    def restore_settings(self):
495        self.restore_governor()
496        self.restore_tuned()
497        self.restore_services()
498        self.restore_sysctl()
499        if self.enable_adq:
500            self.reload_driver("ice")
501
502
503class Target(Server):
504    def __init__(self, name, general_config, target_config):
505        super().__init__(name, general_config, target_config)
506
507        # Defaults
508        self.enable_zcopy = False
509        self.scheduler_name = "static"
510        self.null_block = 0
511        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
512        self.subsystem_info_list = []
513        self.initiator_info = []
514        self.nvme_allowlist = []
515        self.nvme_blocklist = []
516
517        # Target-side measurement options
518        self.enable_pm = True
519        self.enable_sar = True
520        self.enable_pcm = True
521        self.enable_bw = True
522        self.enable_dpdk_memory = True
523
524        if "null_block_devices" in target_config:
525            self.null_block = target_config["null_block_devices"]
526        if "scheduler_settings" in target_config:
527            self.scheduler_name = target_config["scheduler_settings"]
528        if "zcopy_settings" in target_config:
529            self.enable_zcopy = target_config["zcopy_settings"]
530        if "results_dir" in target_config:
531            self.results_dir = target_config["results_dir"]
532        if "blocklist" in target_config:
533            self.nvme_blocklist = target_config["blocklist"]
534        if "allowlist" in target_config:
535            self.nvme_allowlist = target_config["allowlist"]
536            # Blocklist takes precedence, remove common elements from allowlist
537            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
538        if "enable_pm" in target_config:
539            self.enable_pm = target_config["enable_pm"]
540        if "enable_sar" in target_config:
541            self.enable_sar = target_config["enable_sar"]
542        if "enable_pcm" in target_config:
543            self.enable_pcm = target_config["enable_pcm"]
544        if "enable_bandwidth" in target_config:
545            self.enable_bw = target_config["enable_bandwidth"]
546        if "enable_dpdk_memory" in target_config:
547            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
548
549        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
550        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
551
552        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
553        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
554        self.set_local_nic_info(self.set_local_nic_info_helper())
555
556        if self.skip_spdk_install is False:
557            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
558
559        self.configure_system()
560        if self.enable_adq:
561            self.configure_adq()
562            self.configure_irq_affinity()
563        self.sys_config()
564
565    def set_local_nic_info_helper(self):
566        return json.loads(self.exec_cmd(["lshw", "-json"]))
567
568    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
569        stderr_opt = None
570        if stderr_redirect:
571            stderr_opt = subprocess.STDOUT
572        if change_dir:
573            old_cwd = os.getcwd()
574            os.chdir(change_dir)
575            self.log.info("Changing directory to %s" % change_dir)
576
577        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
578
579        if change_dir:
580            os.chdir(old_cwd)
581            self.log.info("Changing directory to %s" % old_cwd)
582        return out
583
584    def zip_spdk_sources(self, spdk_dir, dest_file):
585        self.log.info("Zipping SPDK source directory")
586        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
587        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
588            for file in files:
589                fh.write(os.path.relpath(os.path.join(root, file)))
590        fh.close()
591        self.log.info("Done zipping")
592
593    @staticmethod
594    def _chunks(input_list, chunks_no):
595        div, rem = divmod(len(input_list), chunks_no)
596        for i in range(chunks_no):
597            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
598            yield input_list[si:si + (div + 1 if i < rem else div)]
599
600    def spread_bdevs(self, req_disks):
601        # Spread available block devices indexes:
602        # - evenly across available initiator systems
603        # - evenly across available NIC interfaces for
604        #   each initiator
605        # Not NUMA aware.
606        ip_bdev_map = []
607        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
608
609        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
610            self.initiator_info[i]["bdev_range"] = init_chunk
611            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
612            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
613                for c in nic_chunk:
614                    ip_bdev_map.append((ip, c))
615        return ip_bdev_map
616
617    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
618        cpu_number = os.cpu_count()
619        sar_idle_sum = 0
620        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
621        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
622
623        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
624        time.sleep(ramp_time)
625        self.log.info("Starting SAR measurements")
626
627        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
628        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
629            for line in out.split("\n"):
630                if "Average" in line:
631                    if "CPU" in line:
632                        self.log.info("Summary CPU utilization from SAR:")
633                        self.log.info(line)
634                    elif "all" in line:
635                        self.log.info(line)
636                    else:
637                        sar_idle_sum += float(line.split()[7])
638            fh.write(out)
639        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
640
641        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
642            f.write("%0.2f" % sar_cpu_usage)
643
644    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
645        time.sleep(ramp_time)
646        self.log.info("Starting power measurements")
647        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
648                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
649                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
650
651    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
652        time.sleep(ramp_time)
653        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
654        pcm_memory = subprocess.Popen(cmd)
655        time.sleep(run_time)
656        pcm_memory.terminate()
657
658    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
659        time.sleep(ramp_time)
660        cmd = ["pcm", "1", "-i=%s" % run_time,
661               "-csv=%s/%s" % (results_dir, pcm_file_name)]
662        subprocess.run(cmd)
663        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
664        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
665        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
666        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
667        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
668
669    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
670        time.sleep(ramp_time)
671        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
672        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
673            fh.write(out)
674        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
675        #       Improve this so that additional file containing measurements average is generated too.
676
677    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
678        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
679        time.sleep(ramp_time)
680        self.log.info("INFO: starting network bandwidth measure")
681        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
682                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
683        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
684        #       Improve this so that additional file containing measurements average is generated too.
685        # TODO: Monitor only these interfaces which are currently used to run the workload.
686
687    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
688        self.log.info("INFO: waiting to generate DPDK memory usage")
689        time.sleep(ramp_time)
690        self.log.info("INFO: generating DPDK memory usage")
691        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
692        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
693
694    def sys_config(self):
695        self.log.info("====Kernel release:====")
696        self.log.info(os.uname().release)
697        self.log.info("====Kernel command line:====")
698        with open('/proc/cmdline') as f:
699            cmdline = f.readlines()
700            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
701        self.log.info("====sysctl conf:====")
702        with open('/etc/sysctl.conf') as f:
703            sysctl = f.readlines()
704            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
705        self.log.info("====Cpu power info:====")
706        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
707        self.log.info("====zcopy settings:====")
708        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
709        self.log.info("====Scheduler settings:====")
710        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
711
712
713class Initiator(Server):
714    def __init__(self, name, general_config, initiator_config):
715        super().__init__(name, general_config, initiator_config)
716
717        # Required fields
718        self.ip = initiator_config["ip"]
719        self.target_nic_ips = initiator_config["target_nic_ips"]
720
721        # Defaults
722        self.cpus_allowed = None
723        self.cpus_allowed_policy = "shared"
724        self.spdk_dir = "/tmp/spdk"
725        self.fio_bin = "/usr/src/fio/fio"
726        self.nvmecli_bin = "nvme"
727        self.cpu_frequency = None
728        self.subsystem_info_list = []
729        self.allow_cpu_sharing = True
730
731        if "spdk_dir" in initiator_config:
732            self.spdk_dir = initiator_config["spdk_dir"]
733        if "fio_bin" in initiator_config:
734            self.fio_bin = initiator_config["fio_bin"]
735        if "nvmecli_bin" in initiator_config:
736            self.nvmecli_bin = initiator_config["nvmecli_bin"]
737        if "cpus_allowed" in initiator_config:
738            self.cpus_allowed = initiator_config["cpus_allowed"]
739        if "cpus_allowed_policy" in initiator_config:
740            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
741        if "cpu_frequency" in initiator_config:
742            self.cpu_frequency = initiator_config["cpu_frequency"]
743        if "allow_cpu_sharing" in initiator_config:
744            self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"]
745        if os.getenv('SPDK_WORKSPACE'):
746            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
747
748        self.ssh_connection = paramiko.SSHClient()
749        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
750        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
751        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
752        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
753        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
754
755        if self.skip_spdk_install is False:
756            self.copy_spdk("/tmp/spdk.zip")
757
758        self.set_local_nic_info(self.set_local_nic_info_helper())
759        self.set_cpu_frequency()
760        self.configure_system()
761        if self.enable_adq:
762            self.configure_adq()
763        self.sys_config()
764
765    def set_local_nic_info_helper(self):
766        return json.loads(self.exec_cmd(["lshw", "-json"]))
767
768    def stop(self):
769        self.restore_settings()
770        self.ssh_connection.close()
771
772    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
773        if change_dir:
774            cmd = ["cd", change_dir, ";", *cmd]
775
776        # In case one of the command elements contains whitespace and is not
777        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
778        # when sending to remote system.
779        for i, c in enumerate(cmd):
780            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
781                cmd[i] = '"%s"' % c
782        cmd = " ".join(cmd)
783
784        # Redirect stderr to stdout thanks using get_pty option if needed
785        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
786        out = stdout.read().decode(encoding="utf-8")
787
788        # Check the return code
789        rc = stdout.channel.recv_exit_status()
790        if rc:
791            raise CalledProcessError(int(rc), cmd, out)
792
793        return out
794
795    def put_file(self, local, remote_dest):
796        ftp = self.ssh_connection.open_sftp()
797        ftp.put(local, remote_dest)
798        ftp.close()
799
800    def get_file(self, remote, local_dest):
801        ftp = self.ssh_connection.open_sftp()
802        ftp.get(remote, local_dest)
803        ftp.close()
804
805    def copy_spdk(self, local_spdk_zip):
806        self.log.info("Copying SPDK sources to initiator %s" % self.name)
807        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
808        self.log.info("Copied sources zip from target")
809        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
810        self.log.info("Sources unpacked")
811
812    def copy_result_files(self, dest_dir):
813        self.log.info("Copying results")
814
815        if not os.path.exists(dest_dir):
816            os.mkdir(dest_dir)
817
818        # Get list of result files from initiator and copy them back to target
819        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
820
821        for file in file_list:
822            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
823                          os.path.join(dest_dir, file))
824        self.log.info("Done copying results")
825
826    def match_subsystems(self, target_subsytems):
827        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
828        subsystems.sort(key=lambda x: x[1])
829        self.log.info("Found matching subsystems on target side:")
830        for s in subsystems:
831            self.log.info(s)
832        self.subsystem_info_list = subsystems
833
834    @abstractmethod
835    def init_connect(self):
836        pass
837
838    @abstractmethod
839    def init_disconnect(self):
840        pass
841
842    @abstractmethod
843    def gen_fio_filename_conf(self, *args, **kwargs):
844        # Logic implemented in SPDKInitiator and KernelInitiator classes
845        pass
846
847    def get_route_nic_numa(self, remote_nvme_ip):
848        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
849        local_nvme_nic = local_nvme_nic[0]["dev"]
850        return self.get_nic_numa_node(local_nvme_nic)
851
852    @staticmethod
853    def gen_fio_offset_section(offset_inc, num_jobs):
854        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
855        return "\n".join(["size=%s%%" % offset_inc,
856                          "offset=0%",
857                          "offset_increment=%s%%" % offset_inc])
858
859    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
860        numa_stats = {}
861        allowed_cpus = []
862        for nvme in fio_filenames_list:
863            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
864            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
865
866        # Use the most common NUMA node for this chunk to allocate memory and CPUs
867        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
868
869        # Check if we have enough free CPUs to pop from the list before assigning them
870        if len(self.available_cpus[section_local_numa]) < num_jobs:
871            if self.allow_cpu_sharing:
872                self.log.info("Regenerating available CPU list %s" % section_local_numa)
873                # Remove still available CPUs from the regenerated list. We don't want to
874                # regenerate it with duplicates.
875                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
876                self.available_cpus[section_local_numa].extend(cpus_regen)
877                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
878            else:
879                self.log.error("No more free CPU cores to use from allowed_cpus list!")
880                raise IndexError
881
882        for _ in range(num_jobs):
883            try:
884                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
885            except IndexError:
886                self.log.error("No more free CPU cores to use from allowed_cpus list!")
887                raise
888
889        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
890                          "numa_mem_policy=prefer:%s" % section_local_numa])
891
892    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
893                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
894                       offset=False, offset_inc=0):
895        fio_conf_template = """
896[global]
897ioengine={ioengine}
898{spdk_conf}
899thread=1
900group_reporting=1
901direct=1
902percentile_list=50:90:99:99.5:99.9:99.99:99.999
903
904norandommap=1
905rw={rw}
906rwmixread={rwmixread}
907bs={block_size}
908time_based=1
909ramp_time={ramp_time}
910runtime={run_time}
911rate_iops={rate_iops}
912"""
913
914        if self.cpus_allowed is not None:
915            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
916            cpus_num = 0
917            cpus = self.cpus_allowed.split(",")
918            for cpu in cpus:
919                if "-" in cpu:
920                    a, b = cpu.split("-")
921                    a = int(a)
922                    b = int(b)
923                    cpus_num += len(range(a, b))
924                else:
925                    cpus_num += 1
926            self.num_cores = cpus_num
927            threads = range(0, self.num_cores)
928        elif hasattr(self, 'num_cores'):
929            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
930            threads = range(0, int(self.num_cores))
931        else:
932            self.num_cores = len(self.subsystem_info_list)
933            threads = range(0, len(self.subsystem_info_list))
934
935        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
936                                                      offset, offset_inc)
937
938        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
939                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
940                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
941
942        # TODO: hipri disabled for now, as it causes fio errors:
943        # io_u error on file /dev/nvme2n1: Operation not supported
944        # See comment in KernelInitiator class, init_connect() function
945        if "io_uring" in self.ioengine:
946            fio_config = fio_config + """
947fixedbufs=1
948registerfiles=1
949#hipri=1
950"""
951        if num_jobs:
952            fio_config = fio_config + "numjobs=%s \n" % num_jobs
953        if self.cpus_allowed is not None:
954            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
955            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
956        fio_config = fio_config + filename_section
957
958        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
959        if hasattr(self, "num_cores"):
960            fio_config_filename += "_%sCPU" % self.num_cores
961        fio_config_filename += ".fio"
962
963        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
964        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
965        self.log.info("Created FIO Config:")
966        self.log.info(fio_config)
967
968        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
969
970    def set_cpu_frequency(self):
971        if self.cpu_frequency is not None:
972            try:
973                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
974                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
975                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
976            except Exception:
977                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
978                sys.exit()
979        else:
980            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
981
982    def run_fio(self, fio_config_file, run_num=1):
983        job_name, _ = os.path.splitext(fio_config_file)
984        self.log.info("Starting FIO run for job: %s" % job_name)
985        self.log.info("Using FIO: %s" % self.fio_bin)
986
987        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
988        try:
989            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
990                                    "--output=%s" % output_filename, "--eta=never"], True)
991            self.log.info(output)
992            self.log.info("FIO run finished. Results in: %s" % output_filename)
993        except subprocess.CalledProcessError as e:
994            self.log.error("ERROR: Fio process failed!")
995            self.log.error(e.stdout)
996
997    def sys_config(self):
998        self.log.info("====Kernel release:====")
999        self.log.info(self.exec_cmd(["uname", "-r"]))
1000        self.log.info("====Kernel command line:====")
1001        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1002        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1003        self.log.info("====sysctl conf:====")
1004        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
1005        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1006        self.log.info("====Cpu power info:====")
1007        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
1008
1009
1010class KernelTarget(Target):
1011    def __init__(self, name, general_config, target_config):
1012        super().__init__(name, general_config, target_config)
1013        # Defaults
1014        self.nvmet_bin = "nvmetcli"
1015
1016        if "nvmet_bin" in target_config:
1017            self.nvmet_bin = target_config["nvmet_bin"]
1018
1019    def load_drivers(self):
1020        self.log.info("Loading drivers")
1021        super().load_drivers()
1022        if self.null_block:
1023            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1024
1025    def configure_adq(self):
1026        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1027
1028    def adq_configure_tc(self):
1029        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1030
1031    def adq_set_busy_read(self, busy_read_val):
1032        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1033        return {"net.core.busy_read": 0}
1034
1035    def stop(self):
1036        self.nvmet_command(self.nvmet_bin, "clear")
1037        self.restore_settings()
1038
1039    def get_nvme_device_bdf(self, nvme_dev_path):
1040        nvme_name = os.path.basename(nvme_dev_path)
1041        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1042
1043    def get_nvme_devices(self):
1044        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1045        nvme_list = []
1046        for dev in dev_list:
1047            if "nvme" not in dev:
1048                continue
1049            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1050                continue
1051            if len(self.nvme_allowlist) == 0:
1052                nvme_list.append(dev)
1053                continue
1054            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1055                nvme_list.append(dev)
1056        return nvme_list
1057
1058    def nvmet_command(self, nvmet_bin, command):
1059        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1060
1061    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1062
1063        nvmet_cfg = {
1064            "ports": [],
1065            "hosts": [],
1066            "subsystems": [],
1067        }
1068
1069        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1070            port = str(4420 + bdev_num)
1071            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1072            serial = "SPDK00%s" % bdev_num
1073            bdev_name = nvme_list[bdev_num]
1074
1075            nvmet_cfg["subsystems"].append({
1076                "allowed_hosts": [],
1077                "attr": {
1078                    "allow_any_host": "1",
1079                    "serial": serial,
1080                    "version": "1.3"
1081                },
1082                "namespaces": [
1083                    {
1084                        "device": {
1085                            "path": bdev_name,
1086                            "uuid": "%s" % uuid.uuid4()
1087                        },
1088                        "enable": 1,
1089                        "nsid": port
1090                    }
1091                ],
1092                "nqn": nqn
1093            })
1094
1095            nvmet_cfg["ports"].append({
1096                "addr": {
1097                    "adrfam": "ipv4",
1098                    "traddr": ip,
1099                    "trsvcid": port,
1100                    "trtype": self.transport
1101                },
1102                "portid": bdev_num,
1103                "referrals": [],
1104                "subsystems": [nqn]
1105            })
1106
1107            self.subsystem_info_list.append((port, nqn, ip))
1108        self.subsys_no = len(self.subsystem_info_list)
1109
1110        with open("kernel.conf", "w") as fh:
1111            fh.write(json.dumps(nvmet_cfg, indent=2))
1112
1113    def tgt_start(self):
1114        self.log.info("Configuring kernel NVMeOF Target")
1115
1116        if self.null_block:
1117            self.log.info("Configuring with null block device.")
1118            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1119        else:
1120            self.log.info("Configuring with NVMe drives.")
1121            nvme_list = self.get_nvme_devices()
1122
1123        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1124        self.subsys_no = len(nvme_list)
1125
1126        self.nvmet_command(self.nvmet_bin, "clear")
1127        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1128
1129        if self.enable_adq:
1130            self.adq_configure_tc()
1131
1132        self.log.info("Done configuring kernel NVMeOF Target")
1133
1134
1135class SPDKTarget(Target):
1136    def __init__(self, name, general_config, target_config):
1137        # Required fields
1138        self.core_mask = target_config["core_mask"]
1139        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1140
1141        super().__init__(name, general_config, target_config)
1142
1143        # Defaults
1144        self.dif_insert_strip = False
1145        self.null_block_dif_type = 0
1146        self.num_shared_buffers = 4096
1147        self.max_queue_depth = 128
1148        self.bpf_proc = None
1149        self.bpf_scripts = []
1150        self.enable_dsa = False
1151        self.scheduler_core_limit = None
1152        self.iobuf_small_pool_count = 32767
1153        self.iobuf_large_pool_count = 16383
1154        self.num_cqe = 4096
1155
1156        if "num_shared_buffers" in target_config:
1157            self.num_shared_buffers = target_config["num_shared_buffers"]
1158        if "max_queue_depth" in target_config:
1159            self.max_queue_depth = target_config["max_queue_depth"]
1160        if "null_block_dif_type" in target_config:
1161            self.null_block_dif_type = target_config["null_block_dif_type"]
1162        if "dif_insert_strip" in target_config:
1163            self.dif_insert_strip = target_config["dif_insert_strip"]
1164        if "bpf_scripts" in target_config:
1165            self.bpf_scripts = target_config["bpf_scripts"]
1166        if "dsa_settings" in target_config:
1167            self.enable_dsa = target_config["dsa_settings"]
1168        if "scheduler_core_limit" in target_config:
1169            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1170        if "iobuf_small_pool_count" in target_config:
1171            self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"]
1172        if "iobuf_large_pool_count" in target_config:
1173            self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"]
1174        if "num_cqe" in target_config:
1175            self.num_cqe = target_config["num_cqe"]
1176
1177        self.log.info("====DSA settings:====")
1178        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1179
1180    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1181        if mode not in ["default", "bynode", "cpulist",
1182                        "shared", "split", "split-bynode"]:
1183            self.log.error("%s irq affinity setting not supported" % mode)
1184            raise Exception
1185
1186        # Create core list from SPDK's mask and change it to string.
1187        # This is the type configure_irq_affinity expects for cpulist parameter.
1188        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1189        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1190        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1191
1192        if mode == "shared":
1193            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1194        elif mode == "split":
1195            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1196        elif mode == "split-bynode":
1197            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1198        else:
1199            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1200
1201    def adq_set_busy_read(self, busy_read_val):
1202        return {"net.core.busy_read": busy_read_val}
1203
1204    def get_nvme_devices_count(self):
1205        return len(self.get_nvme_devices())
1206
1207    def get_nvme_devices(self):
1208        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1209        bdev_bdfs = []
1210        for bdev in bdev_subsys_json_obj["config"]:
1211            bdev_traddr = bdev["params"]["traddr"]
1212            if bdev_traddr in self.nvme_blocklist:
1213                continue
1214            if len(self.nvme_allowlist) == 0:
1215                bdev_bdfs.append(bdev_traddr)
1216            if bdev_traddr in self.nvme_allowlist:
1217                bdev_bdfs.append(bdev_traddr)
1218        return bdev_bdfs
1219
1220    def spdk_tgt_configure(self):
1221        self.log.info("Configuring SPDK NVMeOF target via RPC")
1222
1223        # Create transport layer
1224        nvmf_transport_params = {
1225            "client": self.client,
1226            "trtype": self.transport,
1227            "num_shared_buffers": self.num_shared_buffers,
1228            "max_queue_depth": self.max_queue_depth,
1229            "dif_insert_or_strip": self.dif_insert_strip,
1230            "sock_priority": self.adq_priority,
1231            "num_cqe": self.num_cqe
1232        }
1233
1234        if self.enable_adq:
1235            nvmf_transport_params["acceptor_poll_rate"] = 10000
1236
1237        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1238        self.log.info("SPDK NVMeOF transport layer:")
1239        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1240
1241        if self.null_block:
1242            self.spdk_tgt_add_nullblock(self.null_block)
1243            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1244        else:
1245            self.spdk_tgt_add_nvme_conf()
1246            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1247
1248        if self.enable_adq:
1249            self.adq_configure_tc()
1250
1251        self.log.info("Done configuring SPDK NVMeOF Target")
1252
1253    def spdk_tgt_add_nullblock(self, null_block_count):
1254        md_size = 0
1255        block_size = 4096
1256        if self.null_block_dif_type != 0:
1257            md_size = 128
1258
1259        self.log.info("Adding null block bdevices to config via RPC")
1260        for i in range(null_block_count):
1261            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1262            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1263                                      dif_type=self.null_block_dif_type, md_size=md_size)
1264        self.log.info("SPDK Bdevs configuration:")
1265        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1266
1267    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1268        self.log.info("Adding NVMe bdevs to config via RPC")
1269
1270        bdfs = self.get_nvme_devices()
1271        bdfs = [b.replace(":", ".") for b in bdfs]
1272
1273        if req_num_disks:
1274            if req_num_disks > len(bdfs):
1275                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1276                sys.exit(1)
1277            else:
1278                bdfs = bdfs[0:req_num_disks]
1279
1280        for i, bdf in enumerate(bdfs):
1281            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1282
1283        self.log.info("SPDK Bdevs configuration:")
1284        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1285
1286    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1287        self.log.info("Adding subsystems to config")
1288        if not req_num_disks:
1289            req_num_disks = self.get_nvme_devices_count()
1290
1291        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1292            port = str(4420 + bdev_num)
1293            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1294            serial = "SPDK00%s" % bdev_num
1295            bdev_name = "Nvme%sn1" % bdev_num
1296
1297            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1298                                           allow_any_host=True, max_namespaces=8)
1299            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1300            for nqn_name in [nqn, "discovery"]:
1301                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1302                                                     nqn=nqn_name,
1303                                                     trtype=self.transport,
1304                                                     traddr=ip,
1305                                                     trsvcid=port,
1306                                                     adrfam="ipv4")
1307            self.subsystem_info_list.append((port, nqn, ip))
1308        self.subsys_no = len(self.subsystem_info_list)
1309
1310        self.log.info("SPDK NVMeOF subsystem configuration:")
1311        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1312
1313    def bpf_start(self):
1314        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1315        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1316        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1317        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1318
1319        with open(self.pid, "r") as fh:
1320            nvmf_pid = str(fh.readline())
1321
1322        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1323        self.log.info(cmd)
1324        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1325
1326    def tgt_start(self):
1327        if self.null_block:
1328            self.subsys_no = 1
1329        else:
1330            self.subsys_no = self.get_nvme_devices_count()
1331        self.log.info("Starting SPDK NVMeOF Target process")
1332        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1333        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1334        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1335
1336        with open(self.pid, "w") as fh:
1337            fh.write(str(proc.pid))
1338        self.nvmf_proc = proc
1339        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1340        self.log.info("Waiting for spdk to initialize...")
1341        while True:
1342            if os.path.exists("/var/tmp/spdk.sock"):
1343                break
1344            time.sleep(1)
1345        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1346
1347        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1348        rpc.iobuf.iobuf_set_options(self.client,
1349                                    small_pool_count=self.iobuf_small_pool_count,
1350                                    large_pool_count=self.iobuf_large_pool_count,
1351                                    small_bufsize=None,
1352                                    large_bufsize=None)
1353
1354        if self.enable_zcopy:
1355            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1356                                           enable_zerocopy_send_server=True)
1357            self.log.info("Target socket options:")
1358            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1359
1360        if self.enable_adq:
1361            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1362            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1363                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1364
1365        if self.enable_dsa:
1366            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1367            self.log.info("Target DSA accel module enabled")
1368
1369        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1370        rpc.framework_start_init(self.client)
1371
1372        if self.bpf_scripts:
1373            self.bpf_start()
1374
1375        self.spdk_tgt_configure()
1376
1377    def stop(self):
1378        if self.bpf_proc:
1379            self.log.info("Stopping BPF Trace script")
1380            self.bpf_proc.terminate()
1381            self.bpf_proc.wait()
1382
1383        if hasattr(self, "nvmf_proc"):
1384            try:
1385                self.nvmf_proc.terminate()
1386                self.nvmf_proc.wait(timeout=30)
1387            except Exception as e:
1388                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1389                self.log.info(e)
1390                self.nvmf_proc.kill()
1391                self.nvmf_proc.communicate()
1392                # Try to clean up RPC socket files if they were not removed
1393                # because of using 'kill'
1394                try:
1395                    os.remove("/var/tmp/spdk.sock")
1396                    os.remove("/var/tmp/spdk.sock.lock")
1397                except FileNotFoundError:
1398                    pass
1399        self.restore_settings()
1400
1401
1402class KernelInitiator(Initiator):
1403    def __init__(self, name, general_config, initiator_config):
1404        super().__init__(name, general_config, initiator_config)
1405
1406        # Defaults
1407        self.extra_params = ""
1408        self.ioengine = "libaio"
1409        self.spdk_conf = ""
1410
1411        if "num_cores" in initiator_config:
1412            self.num_cores = initiator_config["num_cores"]
1413
1414        if "extra_params" in initiator_config:
1415            self.extra_params = initiator_config["extra_params"]
1416
1417        if "kernel_engine" in initiator_config:
1418            self.ioengine = initiator_config["kernel_engine"]
1419            if "io_uring" in self.ioengine:
1420                self.extra_params += ' --nr-poll-queues=8'
1421
1422    def configure_adq(self):
1423        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1424
1425    def adq_configure_tc(self):
1426        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1427
1428    def adq_set_busy_read(self, busy_read_val):
1429        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1430        return {"net.core.busy_read": 0}
1431
1432    def get_connected_nvme_list(self):
1433        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1434        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1435                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1436        return nvme_list
1437
1438    def init_connect(self):
1439        self.log.info("Below connection attempts may result in error messages, this is expected!")
1440        for subsystem in self.subsystem_info_list:
1441            self.log.info("Trying to connect %s %s %s" % subsystem)
1442            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1443                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1444            time.sleep(2)
1445
1446        if "io_uring" in self.ioengine:
1447            self.log.info("Setting block layer settings for io_uring.")
1448
1449            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1450            #       apparently it's not possible for connected subsystems.
1451            #       Results in "error: Invalid argument"
1452            block_sysfs_settings = {
1453                "iostats": "0",
1454                "rq_affinity": "0",
1455                "nomerges": "2"
1456            }
1457
1458            for disk in self.get_connected_nvme_list():
1459                sysfs = os.path.join("/sys/block", disk, "queue")
1460                for k, v in block_sysfs_settings.items():
1461                    sysfs_opt_path = os.path.join(sysfs, k)
1462                    try:
1463                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1464                    except CalledProcessError as e:
1465                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1466                    finally:
1467                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1468                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1469
1470    def init_disconnect(self):
1471        for subsystem in self.subsystem_info_list:
1472            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1473            time.sleep(1)
1474
1475    def get_nvme_subsystem_numa(self, dev_name):
1476        # Remove two last characters to get controller name instead of subsystem name
1477        nvme_ctrl = os.path.basename(dev_name)[:-2]
1478        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1479                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1480        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1481
1482    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1483        self.available_cpus = self.get_numa_cpu_map()
1484        if len(threads) >= len(subsystems):
1485            threads = range(0, len(subsystems))
1486
1487        # Generate connected nvme devices names and sort them by used NIC numa node
1488        # to allow better grouping when splitting into fio sections.
1489        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1490        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1491        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1492
1493        filename_section = ""
1494        nvme_per_split = int(len(nvme_list) / len(threads))
1495        remainder = len(nvme_list) % len(threads)
1496        iterator = iter(nvme_list)
1497        result = []
1498        for i in range(len(threads)):
1499            result.append([])
1500            for _ in range(nvme_per_split):
1501                result[i].append(next(iterator))
1502                if remainder:
1503                    result[i].append(next(iterator))
1504                    remainder -= 1
1505        for i, r in enumerate(result):
1506            header = "[filename%s]" % i
1507            disks = "\n".join(["filename=%s" % x for x in r])
1508            job_section_qd = round((io_depth * len(r)) / num_jobs)
1509            if job_section_qd == 0:
1510                job_section_qd = 1
1511            iodepth = "iodepth=%s" % job_section_qd
1512
1513            offset_section = ""
1514            if offset:
1515                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1516
1517            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1518
1519            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1520
1521        return filename_section
1522
1523
1524class SPDKInitiator(Initiator):
1525    def __init__(self, name, general_config, initiator_config):
1526        super().__init__(name, general_config, initiator_config)
1527
1528        if self.skip_spdk_install is False:
1529            self.install_spdk()
1530
1531        # Optional fields
1532        self.enable_data_digest = False
1533        if "enable_data_digest" in initiator_config:
1534            self.enable_data_digest = initiator_config["enable_data_digest"]
1535        if "num_cores" in initiator_config:
1536            self.num_cores = initiator_config["num_cores"]
1537
1538        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1539        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1540
1541    def adq_set_busy_read(self, busy_read_val):
1542        return {"net.core.busy_read": busy_read_val}
1543
1544    def install_spdk(self):
1545        self.log.info("Using fio binary %s" % self.fio_bin)
1546        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1547        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1548        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1549                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1550                       "--enable-lto", "--disable-unit-tests"])
1551        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1552        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1553
1554        self.log.info("SPDK built")
1555        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1556
1557    def init_connect(self):
1558        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1559        # bdev plugin initiates connection just before starting IO traffic.
1560        # This is just to have a "init_connect" equivalent of the same function
1561        # from KernelInitiator class.
1562        # Just prepare bdev.conf JSON file for later use and consider it
1563        # "making a connection".
1564        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1565        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1566
1567    def init_disconnect(self):
1568        # SPDK Initiator does not need to explicity disconnect as this gets done
1569        # after fio bdev plugin finishes IO.
1570        return
1571
1572    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1573        bdev_cfg_section = {
1574            "subsystems": [
1575                {
1576                    "subsystem": "bdev",
1577                    "config": []
1578                }
1579            ]
1580        }
1581
1582        for i, subsys in enumerate(remote_subsystem_list):
1583            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1584            nvme_ctrl = {
1585                "method": "bdev_nvme_attach_controller",
1586                "params": {
1587                    "name": "Nvme{}".format(i),
1588                    "trtype": self.transport,
1589                    "traddr": sub_addr,
1590                    "trsvcid": sub_port,
1591                    "subnqn": sub_nqn,
1592                    "adrfam": "IPv4"
1593                }
1594            }
1595
1596            if self.enable_adq:
1597                nvme_ctrl["params"].update({"priority": "1"})
1598
1599            if self.enable_data_digest:
1600                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1601
1602            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1603
1604        return json.dumps(bdev_cfg_section, indent=2)
1605
1606    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1607        self.available_cpus = self.get_numa_cpu_map()
1608        filename_section = ""
1609        if len(threads) >= len(subsystems):
1610            threads = range(0, len(subsystems))
1611
1612        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1613        # to allow better grouping when splitting into fio sections.
1614        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1615        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1616        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1617
1618        nvme_per_split = int(len(subsystems) / len(threads))
1619        remainder = len(subsystems) % len(threads)
1620        iterator = iter(filenames)
1621        result = []
1622        for i in range(len(threads)):
1623            result.append([])
1624            for _ in range(nvme_per_split):
1625                result[i].append(next(iterator))
1626            if remainder:
1627                result[i].append(next(iterator))
1628                remainder -= 1
1629        for i, r in enumerate(result):
1630            header = "[filename%s]" % i
1631            disks = "\n".join(["filename=%s" % x for x in r])
1632            job_section_qd = round((io_depth * len(r)) / num_jobs)
1633            if job_section_qd == 0:
1634                job_section_qd = 1
1635            iodepth = "iodepth=%s" % job_section_qd
1636
1637            offset_section = ""
1638            if offset:
1639                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1640
1641            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1642
1643            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1644
1645        return filename_section
1646
1647    def get_nvme_subsystem_numa(self, bdev_name):
1648        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1649        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1650
1651        # Remove two last characters to get controller name instead of subsystem name
1652        nvme_ctrl = bdev_name[:-2]
1653        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1654        return self.get_route_nic_numa(remote_nvme_ip)
1655
1656
1657if __name__ == "__main__":
1658    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1659    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1660
1661    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1662    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1663                        help='Configuration file.')
1664    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1665                        help='Results directory.')
1666    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1667                        help='CSV results filename.')
1668    parser.add_argument('-f', '--force', default=False, action='store_true',
1669                        dest='force', help="""Force script to continue and try to use all
1670                        available NVMe devices during test.
1671                        WARNING: Might result in data loss on used NVMe drives""")
1672
1673    args = parser.parse_args()
1674
1675    logging.basicConfig(level=logging.INFO,
1676                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1677
1678    logging.info("Using config file: %s" % args.config)
1679    with open(args.config, "r") as config:
1680        data = json.load(config)
1681
1682    initiators = []
1683    fio_cases = []
1684
1685    general_config = data["general"]
1686    target_config = data["target"]
1687    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1688
1689    if "null_block_devices" not in data["target"] and \
1690        (args.force is False and
1691            "allowlist" not in data["target"] and
1692            "blocklist" not in data["target"]):
1693        # TODO: Also check if allowlist or blocklist are not empty.
1694        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1695        You can choose to use all available NVMe drives on your system, which may potentially
1696        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1697        exit(1)
1698
1699    for k, v in data.items():
1700        if "target" in k:
1701            v.update({"results_dir": args.results})
1702            if data[k]["mode"] == "spdk":
1703                target_obj = SPDKTarget(k, data["general"], v)
1704            elif data[k]["mode"] == "kernel":
1705                target_obj = KernelTarget(k, data["general"], v)
1706        elif "initiator" in k:
1707            if data[k]["mode"] == "spdk":
1708                init_obj = SPDKInitiator(k, data["general"], v)
1709            elif data[k]["mode"] == "kernel":
1710                init_obj = KernelInitiator(k, data["general"], v)
1711            initiators.append(init_obj)
1712        elif "fio" in k:
1713            fio_workloads = itertools.product(data[k]["bs"],
1714                                              data[k]["qd"],
1715                                              data[k]["rw"])
1716
1717            fio_run_time = data[k]["run_time"]
1718            fio_ramp_time = data[k]["ramp_time"]
1719            fio_rw_mix_read = data[k]["rwmixread"]
1720            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1721            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1722
1723            fio_rate_iops = 0
1724            if "rate_iops" in data[k]:
1725                fio_rate_iops = data[k]["rate_iops"]
1726
1727            fio_offset = False
1728            if "offset" in data[k]:
1729                fio_offset = data[k]["offset"]
1730            fio_offset_inc = 0
1731            if "offset_inc" in data[k]:
1732                fio_offset_inc = data[k]["offset_inc"]
1733        else:
1734            continue
1735
1736    try:
1737        os.mkdir(args.results)
1738    except FileExistsError:
1739        pass
1740
1741    for i in initiators:
1742        target_obj.initiator_info.append(
1743            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1744        )
1745
1746    # TODO: This try block is definietly too large. Need to break this up into separate
1747    # logical blocks to reduce size.
1748    try:
1749        target_obj.tgt_start()
1750
1751        for i in initiators:
1752            i.match_subsystems(target_obj.subsystem_info_list)
1753            if i.enable_adq:
1754                i.adq_configure_tc()
1755
1756        # Poor mans threading
1757        # Run FIO tests
1758        for block_size, io_depth, rw in fio_workloads:
1759            configs = []
1760            for i in initiators:
1761                i.init_connect()
1762                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1763                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1764                                       fio_offset, fio_offset_inc)
1765                configs.append(cfg)
1766
1767            for run_no in range(1, fio_run_num+1):
1768                threads = []
1769                power_daemon = None
1770                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1771
1772                for i, cfg in zip(initiators, configs):
1773                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1774                    threads.append(t)
1775                if target_obj.enable_sar:
1776                    sar_file_prefix = measurements_prefix + "_sar"
1777                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1778                    threads.append(t)
1779
1780                if target_obj.enable_pcm:
1781                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1782
1783                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1784                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1785                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1786                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1787                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1788                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1789
1790                    threads.append(pcm_cpu_t)
1791                    threads.append(pcm_mem_t)
1792                    threads.append(pcm_pow_t)
1793
1794                if target_obj.enable_bw:
1795                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1796                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1797                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1798                    threads.append(t)
1799
1800                if target_obj.enable_dpdk_memory:
1801                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1802                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1803                    threads.append(t)
1804
1805                if target_obj.enable_pm:
1806                    power_daemon = threading.Thread(target=target_obj.measure_power,
1807                                                    args=(args.results, measurements_prefix, script_full_dir,
1808                                                          fio_ramp_time, fio_run_time))
1809                    threads.append(power_daemon)
1810
1811                for t in threads:
1812                    t.start()
1813                for t in threads:
1814                    t.join()
1815
1816            for i in initiators:
1817                i.init_disconnect()
1818                i.copy_result_files(args.results)
1819        try:
1820            parse_results(args.results, args.csv_filename)
1821        except Exception as err:
1822            logging.error("There was an error with parsing the results")
1823            logging.error(err)
1824    finally:
1825        for i in initiators:
1826            try:
1827                i.stop()
1828            except Exception as err:
1829                pass
1830        target_obj.stop()
1831