xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 7348b89cd5e5b74cdfe79ec08b1fb2ee97fd05e7)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.skip_spdk_install = general_config.get('skip_spdk_install', False)
39        self.nic_ips = server_config["nic_ips"]
40        self.mode = server_config["mode"]
41        self.irdma_roce_enable = False
42
43        self.log = logging.getLogger(self.name)
44
45        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
46        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
47            self.irq_scripts_dir = server_config["irq_scripts_dir"]
48
49        self.local_nic_info = []
50        self._nics_json_obj = {}
51        self.svc_restore_dict = {}
52        self.sysctl_restore_dict = {}
53        self.tuned_restore_dict = {}
54        self.governor_restore = ""
55        self.tuned_profile = ""
56
57        self.enable_adq = False
58        self.adq_priority = None
59        self.enable_arfs = server_config.get("enable_arfs", False)
60        self.irq_settings = {"mode": "default"}
61
62        if "adq_enable" in server_config and server_config["adq_enable"]:
63            self.enable_adq = server_config["adq_enable"]
64            self.adq_priority = 1
65        if "irq_settings" in server_config:
66            self.irq_settings.update(server_config["irq_settings"])
67        if "tuned_profile" in server_config:
68            self.tuned_profile = server_config["tuned_profile"]
69        if "irdma_roce_enable" in general_config:
70            self.irdma_roce_enable = general_config["irdma_roce_enable"]
71
72        if not re.match(r'^[A-Za-z0-9\-]+$', name):
73            self.log.info("Please use a name which contains only letters, numbers or dashes")
74            sys.exit(1)
75
76    @staticmethod
77    def get_uncommented_lines(lines):
78        return [line for line in lines if line and not line.startswith('#')]
79
80    def get_nic_name_by_ip(self, ip):
81        if not self._nics_json_obj:
82            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
83            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
84        for nic in self._nics_json_obj:
85            for addr in nic["addr_info"]:
86                if ip in addr["local"]:
87                    return nic["ifname"]
88
89    def set_local_nic_info_helper(self):
90        pass
91
92    def set_local_nic_info(self, pci_info):
93        def extract_network_elements(json_obj):
94            nic_list = []
95            if isinstance(json_obj, list):
96                for x in json_obj:
97                    nic_list.extend(extract_network_elements(x))
98            elif isinstance(json_obj, dict):
99                if "children" in json_obj:
100                    nic_list.extend(extract_network_elements(json_obj["children"]))
101                if "class" in json_obj.keys() and "network" in json_obj["class"]:
102                    nic_list.append(json_obj)
103            return nic_list
104
105        self.local_nic_info = extract_network_elements(pci_info)
106
107    def get_nic_numa_node(self, nic_name):
108        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
109
110    def get_numa_cpu_map(self):
111        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
112        numa_cpu_json_map = {}
113
114        for cpu in numa_cpu_json_obj["cpus"]:
115            cpu_num = int(cpu["cpu"])
116            numa_node = int(cpu["node"])
117            numa_cpu_json_map.setdefault(numa_node, [])
118            numa_cpu_json_map[numa_node].append(cpu_num)
119
120        return numa_cpu_json_map
121
122    # pylint: disable=R0201
123    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
124        return ""
125
126    def configure_system(self):
127        self.load_drivers()
128        self.configure_services()
129        self.configure_sysctl()
130        self.configure_arfs()
131        self.configure_tuned()
132        self.configure_cpu_governor()
133        self.configure_irq_affinity(**self.irq_settings)
134
135    RDMA_PROTOCOL_IWARP = 0
136    RDMA_PROTOCOL_ROCE = 1
137    RDMA_PROTOCOL_UNKNOWN = -1
138
139    def check_rdma_protocol(self):
140        try:
141            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
142            roce_ena = roce_ena.strip()
143            if roce_ena == "0":
144                return self.RDMA_PROTOCOL_IWARP
145            else:
146                return self.RDMA_PROTOCOL_ROCE
147        except CalledProcessError as e:
148            self.log.error("ERROR: failed to check RDMA protocol!")
149            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
150            return self.RDMA_PROTOCOL_UNKNOWN
151
152    def load_drivers(self):
153        self.log.info("Loading drivers")
154        self.exec_cmd(["sudo", "modprobe", "-a",
155                       "nvme-%s" % self.transport,
156                       "nvmet-%s" % self.transport])
157        current_mode = self.check_rdma_protocol()
158        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
159            self.log.error("ERROR: failed to check RDMA protocol mode")
160            return
161        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
162            self.reload_driver("irdma", "roce_ena=1")
163            self.log.info("Loaded irdma driver with RoCE enabled")
164        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
165            self.log.info("Leaving irdma driver with RoCE enabled")
166        else:
167            self.reload_driver("irdma", "roce_ena=0")
168            self.log.info("Loaded irdma driver with iWARP enabled")
169
170    def configure_arfs(self):
171        rps_flow_cnt = 512
172        if not self.enable_arfs:
173            rps_flow_cnt = 0
174
175        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
176        for nic_name in nic_names:
177            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
178            self.log.info(f"Setting rps_flow_cnt for {nic_name}")
179            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
180            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
181
182            for qf in queue_files:
183                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
184
185    def configure_adq(self):
186        self.adq_load_modules()
187        self.adq_configure_nic()
188
189    def adq_load_modules(self):
190        self.log.info("Modprobing ADQ-related Linux modules...")
191        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
192        for module in adq_module_deps:
193            try:
194                self.exec_cmd(["sudo", "modprobe", module])
195                self.log.info("%s loaded!" % module)
196            except CalledProcessError as e:
197                self.log.error("ERROR: failed to load module %s" % module)
198                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
199
200    def adq_configure_tc(self):
201        self.log.info("Configuring ADQ Traffic classes and filters...")
202
203        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
204        num_queues_tc1 = self.num_cores
205        port_param = "dst_port" if isinstance(self, Target) else "src_port"
206        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
207
208        for nic_ip in self.nic_ips:
209            nic_name = self.get_nic_name_by_ip(nic_ip)
210            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
211
212            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
213                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
214                                "queues", "%s@0" % num_queues_tc0,
215                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
216                                "hw", "1", "mode", "channel"]
217            self.log.info(" ".join(tc_qdisc_map_cmd))
218            self.exec_cmd(tc_qdisc_map_cmd)
219
220            time.sleep(5)
221            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
222            self.log.info(" ".join(tc_qdisc_ingress_cmd))
223            self.exec_cmd(tc_qdisc_ingress_cmd)
224
225            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
226            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
227                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
228
229            for port in nic_ports:
230                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
231                                 "protocol", "ip", "ingress", "prio", "1", "flower",
232                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
233                                 "skip_sw", "hw_tc", "1"]
234                self.log.info(" ".join(tc_filter_cmd))
235                self.exec_cmd(tc_filter_cmd)
236
237            # show tc configuration
238            self.log.info("Show tc configuration for %s NIC..." % nic_name)
239            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
240            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
241            self.log.info("%s" % tc_disk_out)
242            self.log.info("%s" % tc_filter_out)
243
244            # Ethtool coalesce settings must be applied after configuring traffic classes
245            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
246            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
247
248            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
249            xps_cmd = ["sudo", xps_script_path, nic_name]
250            self.log.info(xps_cmd)
251            self.exec_cmd(xps_cmd)
252
253    def reload_driver(self, driver, *modprobe_args):
254
255        try:
256            self.exec_cmd(["sudo", "rmmod", driver])
257            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
258        except CalledProcessError as e:
259            self.log.error("ERROR: failed to reload %s module!" % driver)
260            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
261
262    def adq_configure_nic(self):
263        self.log.info("Configuring NIC port settings for ADQ testing...")
264
265        # Reload the driver first, to make sure any previous settings are re-set.
266        self.reload_driver("ice")
267
268        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
269        for nic in nic_names:
270            self.log.info(nic)
271            try:
272                self.exec_cmd(["sudo", "ethtool", "-K", nic,
273                               "hw-tc-offload", "on"])  # Enable hardware TC offload
274                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
275                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
276                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
277                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
278                # but can be used with 4k block sizes as well
279                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
280                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
281            except subprocess.CalledProcessError as e:
282                self.log.error("ERROR: failed to configure NIC port using ethtool!")
283                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
284                self.log.info("Please update your NIC driver and firmware versions and try again.")
285            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
286            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
287
288    def configure_services(self):
289        self.log.info("Configuring active services...")
290        svc_config = configparser.ConfigParser(strict=False)
291
292        # Below list is valid only for RHEL / Fedora systems and might not
293        # contain valid names for other distributions.
294        svc_target_state = {
295            "firewalld": "inactive",
296            "irqbalance": "inactive",
297            "lldpad.service": "inactive",
298            "lldpad.socket": "inactive"
299        }
300
301        for service in svc_target_state:
302            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
303            out = "\n".join(["[%s]" % service, out])
304            svc_config.read_string(out)
305
306            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
307                continue
308
309            service_state = svc_config[service]["ActiveState"]
310            self.log.info("Current state of %s service is %s" % (service, service_state))
311            self.svc_restore_dict.update({service: service_state})
312            if service_state != "inactive":
313                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
314                self.exec_cmd(["sudo", "systemctl", "stop", service])
315
316    def configure_sysctl(self):
317        self.log.info("Tuning sysctl settings...")
318
319        busy_read = 0
320        busy_poll = 0
321        if self.enable_adq and self.mode == "spdk":
322            busy_read = 1
323            busy_poll = 1
324
325        sysctl_opts = {
326            "net.core.busy_poll": busy_poll,
327            "net.core.busy_read": busy_read,
328            "net.core.somaxconn": 4096,
329            "net.core.netdev_max_backlog": 8192,
330            "net.ipv4.tcp_max_syn_backlog": 16384,
331            "net.core.rmem_max": 268435456,
332            "net.core.wmem_max": 268435456,
333            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
334            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
335            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
336            "net.ipv4.route.flush": 1,
337            "vm.overcommit_memory": 1,
338            "net.core.rps_sock_flow_entries": 0
339        }
340
341        if self.enable_adq:
342            sysctl_opts.update(self.adq_set_busy_read(1))
343
344        if self.enable_arfs:
345            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
346
347        for opt, value in sysctl_opts.items():
348            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
349            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
350
351    def configure_tuned(self):
352        if not self.tuned_profile:
353            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
354            return
355
356        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
357        service = "tuned"
358        tuned_config = configparser.ConfigParser(strict=False)
359
360        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
361        out = "\n".join(["[%s]" % service, out])
362        tuned_config.read_string(out)
363        tuned_state = tuned_config[service]["ActiveState"]
364        self.svc_restore_dict.update({service: tuned_state})
365
366        if tuned_state != "inactive":
367            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
368            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
369
370            self.tuned_restore_dict = {
371                "profile": profile,
372                "mode": profile_mode
373            }
374
375        self.exec_cmd(["sudo", "systemctl", "start", service])
376        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
377        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
378
379    def configure_cpu_governor(self):
380        self.log.info("Setting CPU governor to performance...")
381
382        # This assumes that there is the same CPU scaling governor on each CPU
383        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
384        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
385
386    def get_core_list_from_mask(self, core_mask):
387        # Generate list of individual cores from hex core mask
388        # (e.g. '0xffff') or list containing:
389        # - individual cores (e.g. '1, 2, 3')
390        # - core ranges (e.g. '0-3')
391        # - mix of both (e.g. '0, 1-4, 9, 11-13')
392
393        core_list = []
394        if "0x" in core_mask:
395            core_mask_int = int(core_mask, 16)
396            for i in range(core_mask_int.bit_length()):
397                if (1 << i) & core_mask_int:
398                    core_list.append(i)
399            return core_list
400        else:
401            # Core list can be provided in .json config with square brackets
402            # remove them first
403            core_mask = core_mask.replace("[", "")
404            core_mask = core_mask.replace("]", "")
405
406            for i in core_mask.split(","):
407                if "-" in i:
408                    start, end = i.split("-")
409                    core_range = range(int(start), int(end) + 1)
410                    core_list.extend(core_range)
411                else:
412                    core_list.append(int(i))
413            return core_list
414
415    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
416        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
417
418        if mode not in ["default", "bynode", "cpulist"]:
419            raise ValueError("%s irq affinity setting not supported" % mode)
420
421        if mode == "cpulist" and not cpulist:
422            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
423
424        affinity_script = "set_irq_affinity.sh"
425        if "default" not in mode:
426            affinity_script = "set_irq_affinity_cpulist.sh"
427            system_cpu_map = self.get_numa_cpu_map()
428        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
429
430        def cpu_list_to_string(cpulist):
431            return ",".join(map(lambda x: str(x), cpulist))
432
433        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
434        for nic_name in nic_names:
435            irq_cmd = ["sudo", irq_script_path]
436
437            # Use only CPU cores matching NIC NUMA node.
438            # Remove any CPU cores if they're on exclusion list.
439            if mode == "bynode":
440                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
441                if cpulist and exclude_cpulist:
442                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
443                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
444                    if not irq_cpus:
445                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
446                irq_cmd.append(cpu_list_to_string(irq_cpus))
447
448            if mode == "cpulist":
449                irq_cpus = self.get_core_list_from_mask(cpulist)
450                if exclude_cpulist:
451                    # Flatten system CPU list, we don't need NUMA awareness here
452                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
453                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
454                    if not irq_cpus:
455                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
456                irq_cmd.append(cpu_list_to_string(irq_cpus))
457
458            irq_cmd.append(nic_name)
459            self.log.info(irq_cmd)
460            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
461
462    def restore_services(self):
463        self.log.info("Restoring services...")
464        for service, state in self.svc_restore_dict.items():
465            cmd = "stop" if state == "inactive" else "start"
466            self.exec_cmd(["sudo", "systemctl", cmd, service])
467
468    def restore_sysctl(self):
469        self.log.info("Restoring sysctl settings...")
470        for opt, value in self.sysctl_restore_dict.items():
471            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
472
473    def restore_tuned(self):
474        self.log.info("Restoring tuned-adm settings...")
475
476        if not self.tuned_restore_dict:
477            return
478
479        if self.tuned_restore_dict["mode"] == "auto":
480            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
481            self.log.info("Reverted tuned-adm to auto_profile.")
482        else:
483            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
484            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
485
486    def restore_governor(self):
487        self.log.info("Restoring CPU governor setting...")
488        if self.governor_restore:
489            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
490            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
491
492    def restore_settings(self):
493        self.restore_governor()
494        self.restore_tuned()
495        self.restore_services()
496        self.restore_sysctl()
497        if self.enable_adq:
498            self.reload_driver("ice")
499
500
501class Target(Server):
502    def __init__(self, name, general_config, target_config):
503        super().__init__(name, general_config, target_config)
504
505        # Defaults
506        self.enable_zcopy = False
507        self.scheduler_name = "static"
508        self.null_block = 0
509        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
510        self.subsystem_info_list = []
511        self.initiator_info = []
512        self.nvme_allowlist = []
513        self.nvme_blocklist = []
514
515        # Target-side measurement options
516        self.enable_pm = True
517        self.enable_sar = True
518        self.enable_pcm = True
519        self.enable_bw = True
520        self.enable_dpdk_memory = True
521
522        if "null_block_devices" in target_config:
523            self.null_block = target_config["null_block_devices"]
524        if "scheduler_settings" in target_config:
525            self.scheduler_name = target_config["scheduler_settings"]
526        if "zcopy_settings" in target_config:
527            self.enable_zcopy = target_config["zcopy_settings"]
528        if "results_dir" in target_config:
529            self.results_dir = target_config["results_dir"]
530        if "blocklist" in target_config:
531            self.nvme_blocklist = target_config["blocklist"]
532        if "allowlist" in target_config:
533            self.nvme_allowlist = target_config["allowlist"]
534            # Blocklist takes precedence, remove common elements from allowlist
535            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
536        if "enable_pm" in target_config:
537            self.enable_pm = target_config["enable_pm"]
538        if "enable_sar" in target_config:
539            self.enable_sar = target_config["enable_sar"]
540        if "enable_pcm" in target_config:
541            self.enable_pcm = target_config["enable_pcm"]
542        if "enable_bandwidth" in target_config:
543            self.enable_bw = target_config["enable_bandwidth"]
544        if "enable_dpdk_memory" in target_config:
545            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
546
547        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
548        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
549
550        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
551        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
552        self.set_local_nic_info(self.set_local_nic_info_helper())
553
554        if self.skip_spdk_install is False:
555            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
556
557        self.configure_system()
558        if self.enable_adq:
559            self.configure_adq()
560            self.configure_irq_affinity()
561        self.sys_config()
562
563    def set_local_nic_info_helper(self):
564        return json.loads(self.exec_cmd(["lshw", "-json"]))
565
566    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
567        stderr_opt = None
568        if stderr_redirect:
569            stderr_opt = subprocess.STDOUT
570        if change_dir:
571            old_cwd = os.getcwd()
572            os.chdir(change_dir)
573            self.log.info("Changing directory to %s" % change_dir)
574
575        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
576
577        if change_dir:
578            os.chdir(old_cwd)
579            self.log.info("Changing directory to %s" % old_cwd)
580        return out
581
582    def zip_spdk_sources(self, spdk_dir, dest_file):
583        self.log.info("Zipping SPDK source directory")
584        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
585        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
586            for file in files:
587                fh.write(os.path.relpath(os.path.join(root, file)))
588        fh.close()
589        self.log.info("Done zipping")
590
591    @staticmethod
592    def _chunks(input_list, chunks_no):
593        div, rem = divmod(len(input_list), chunks_no)
594        for i in range(chunks_no):
595            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
596            yield input_list[si:si + (div + 1 if i < rem else div)]
597
598    def spread_bdevs(self, req_disks):
599        # Spread available block devices indexes:
600        # - evenly across available initiator systems
601        # - evenly across available NIC interfaces for
602        #   each initiator
603        # Not NUMA aware.
604        ip_bdev_map = []
605        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
606
607        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
608            self.initiator_info[i]["bdev_range"] = init_chunk
609            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
610            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
611                for c in nic_chunk:
612                    ip_bdev_map.append((ip, c))
613        return ip_bdev_map
614
615    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
616        cpu_number = os.cpu_count()
617        sar_idle_sum = 0
618        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
619        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
620
621        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
622        time.sleep(ramp_time)
623        self.log.info("Starting SAR measurements")
624
625        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
626        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
627            for line in out.split("\n"):
628                if "Average" in line:
629                    if "CPU" in line:
630                        self.log.info("Summary CPU utilization from SAR:")
631                        self.log.info(line)
632                    elif "all" in line:
633                        self.log.info(line)
634                    else:
635                        sar_idle_sum += float(line.split()[7])
636            fh.write(out)
637        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
638
639        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
640            f.write("%0.2f" % sar_cpu_usage)
641
642    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
643        time.sleep(ramp_time)
644        self.log.info("Starting power measurements")
645        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
646                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
647                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
648
649    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
650        time.sleep(ramp_time)
651        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
652        pcm_memory = subprocess.Popen(cmd)
653        time.sleep(run_time)
654        pcm_memory.terminate()
655
656    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
657        time.sleep(ramp_time)
658        cmd = ["pcm", "1", "-i=%s" % run_time,
659               "-csv=%s/%s" % (results_dir, pcm_file_name)]
660        subprocess.run(cmd)
661        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
662        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
663        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
664        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
665        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
666
667    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
668        time.sleep(ramp_time)
669        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
670        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
671            fh.write(out)
672        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
673        #       Improve this so that additional file containing measurements average is generated too.
674
675    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
676        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
677        time.sleep(ramp_time)
678        self.log.info("INFO: starting network bandwidth measure")
679        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
680                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
681        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
682        #       Improve this so that additional file containing measurements average is generated too.
683        # TODO: Monitor only these interfaces which are currently used to run the workload.
684
685    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
686        self.log.info("INFO: waiting to generate DPDK memory usage")
687        time.sleep(ramp_time)
688        self.log.info("INFO: generating DPDK memory usage")
689        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
690        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
691
692    def sys_config(self):
693        self.log.info("====Kernel release:====")
694        self.log.info(os.uname().release)
695        self.log.info("====Kernel command line:====")
696        with open('/proc/cmdline') as f:
697            cmdline = f.readlines()
698            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
699        self.log.info("====sysctl conf:====")
700        with open('/etc/sysctl.conf') as f:
701            sysctl = f.readlines()
702            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
703        self.log.info("====Cpu power info:====")
704        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
705        self.log.info("====zcopy settings:====")
706        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
707        self.log.info("====Scheduler settings:====")
708        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
709
710
711class Initiator(Server):
712    def __init__(self, name, general_config, initiator_config):
713        super().__init__(name, general_config, initiator_config)
714
715        # Required fields
716        self.ip = initiator_config["ip"]
717        self.target_nic_ips = initiator_config["target_nic_ips"]
718
719        # Defaults
720        self.cpus_allowed = None
721        self.cpus_allowed_policy = "shared"
722        self.spdk_dir = "/tmp/spdk"
723        self.fio_bin = "/usr/src/fio/fio"
724        self.nvmecli_bin = "nvme"
725        self.cpu_frequency = None
726        self.subsystem_info_list = []
727        self.allow_cpu_sharing = True
728
729        if "spdk_dir" in initiator_config:
730            self.spdk_dir = initiator_config["spdk_dir"]
731        if "fio_bin" in initiator_config:
732            self.fio_bin = initiator_config["fio_bin"]
733        if "nvmecli_bin" in initiator_config:
734            self.nvmecli_bin = initiator_config["nvmecli_bin"]
735        if "cpus_allowed" in initiator_config:
736            self.cpus_allowed = initiator_config["cpus_allowed"]
737        if "cpus_allowed_policy" in initiator_config:
738            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
739        if "cpu_frequency" in initiator_config:
740            self.cpu_frequency = initiator_config["cpu_frequency"]
741        if "allow_cpu_sharing" in initiator_config:
742            self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"]
743        if os.getenv('SPDK_WORKSPACE'):
744            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
745
746        self.ssh_connection = paramiko.SSHClient()
747        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
748        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
749        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
750        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
751        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
752
753        if self.skip_spdk_install is False:
754            self.copy_spdk("/tmp/spdk.zip")
755
756        self.set_local_nic_info(self.set_local_nic_info_helper())
757        self.set_cpu_frequency()
758        self.configure_system()
759        if self.enable_adq:
760            self.configure_adq()
761        self.sys_config()
762
763    def set_local_nic_info_helper(self):
764        return json.loads(self.exec_cmd(["lshw", "-json"]))
765
766    def stop(self):
767        self.restore_settings()
768        self.ssh_connection.close()
769
770    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
771        if change_dir:
772            cmd = ["cd", change_dir, ";", *cmd]
773
774        # In case one of the command elements contains whitespace and is not
775        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
776        # when sending to remote system.
777        for i, c in enumerate(cmd):
778            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
779                cmd[i] = '"%s"' % c
780        cmd = " ".join(cmd)
781
782        # Redirect stderr to stdout thanks using get_pty option if needed
783        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
784        out = stdout.read().decode(encoding="utf-8")
785
786        # Check the return code
787        rc = stdout.channel.recv_exit_status()
788        if rc:
789            raise CalledProcessError(int(rc), cmd, out)
790
791        return out
792
793    def put_file(self, local, remote_dest):
794        ftp = self.ssh_connection.open_sftp()
795        ftp.put(local, remote_dest)
796        ftp.close()
797
798    def get_file(self, remote, local_dest):
799        ftp = self.ssh_connection.open_sftp()
800        ftp.get(remote, local_dest)
801        ftp.close()
802
803    def copy_spdk(self, local_spdk_zip):
804        self.log.info("Copying SPDK sources to initiator %s" % self.name)
805        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
806        self.log.info("Copied sources zip from target")
807        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
808        self.log.info("Sources unpacked")
809
810    def copy_result_files(self, dest_dir):
811        self.log.info("Copying results")
812
813        if not os.path.exists(dest_dir):
814            os.mkdir(dest_dir)
815
816        # Get list of result files from initiator and copy them back to target
817        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
818
819        for file in file_list:
820            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
821                          os.path.join(dest_dir, file))
822        self.log.info("Done copying results")
823
824    def match_subsystems(self, target_subsytems):
825        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
826        subsystems.sort(key=lambda x: x[1])
827        self.log.info("Found matching subsystems on target side:")
828        for s in subsystems:
829            self.log.info(s)
830        self.subsystem_info_list = subsystems
831
832    def gen_fio_filename_conf(self, *args, **kwargs):
833        # Logic implemented in SPDKInitiator and KernelInitiator classes
834        pass
835
836    def get_route_nic_numa(self, remote_nvme_ip):
837        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
838        local_nvme_nic = local_nvme_nic[0]["dev"]
839        return self.get_nic_numa_node(local_nvme_nic)
840
841    @staticmethod
842    def gen_fio_offset_section(offset_inc, num_jobs):
843        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
844        return "\n".join(["size=%s%%" % offset_inc,
845                          "offset=0%",
846                          "offset_increment=%s%%" % offset_inc])
847
848    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
849        numa_stats = {}
850        allowed_cpus = []
851        for nvme in fio_filenames_list:
852            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
853            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
854
855        # Use the most common NUMA node for this chunk to allocate memory and CPUs
856        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
857
858        # Check if we have enough free CPUs to pop from the list before assigning them
859        if len(self.available_cpus[section_local_numa]) < num_jobs:
860            if self.allow_cpu_sharing:
861                self.log.info("Regenerating available CPU list %s" % section_local_numa)
862                # Remove still available CPUs from the regenerated list. We don't want to
863                # regenerate it with duplicates.
864                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
865                self.available_cpus[section_local_numa].extend(cpus_regen)
866                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
867            else:
868                self.log.error("No more free CPU cores to use from allowed_cpus list!")
869                raise IndexError
870
871        for _ in range(num_jobs):
872            try:
873                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
874            except IndexError:
875                self.log.error("No more free CPU cores to use from allowed_cpus list!")
876                raise
877
878        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
879                          "numa_mem_policy=prefer:%s" % section_local_numa])
880
881    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
882                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
883                       offset=False, offset_inc=0):
884        fio_conf_template = """
885[global]
886ioengine={ioengine}
887{spdk_conf}
888thread=1
889group_reporting=1
890direct=1
891percentile_list=50:90:99:99.5:99.9:99.99:99.999
892
893norandommap=1
894rw={rw}
895rwmixread={rwmixread}
896bs={block_size}
897time_based=1
898ramp_time={ramp_time}
899runtime={run_time}
900rate_iops={rate_iops}
901"""
902
903        if self.cpus_allowed is not None:
904            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
905            cpus_num = 0
906            cpus = self.cpus_allowed.split(",")
907            for cpu in cpus:
908                if "-" in cpu:
909                    a, b = cpu.split("-")
910                    a = int(a)
911                    b = int(b)
912                    cpus_num += len(range(a, b))
913                else:
914                    cpus_num += 1
915            self.num_cores = cpus_num
916            threads = range(0, self.num_cores)
917        elif hasattr(self, 'num_cores'):
918            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
919            threads = range(0, int(self.num_cores))
920        else:
921            self.num_cores = len(self.subsystem_info_list)
922            threads = range(0, len(self.subsystem_info_list))
923
924        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
925                                                      offset, offset_inc)
926
927        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
928                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
929                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
930
931        # TODO: hipri disabled for now, as it causes fio errors:
932        # io_u error on file /dev/nvme2n1: Operation not supported
933        # See comment in KernelInitiator class, init_connect() function
934        if "io_uring" in self.ioengine:
935            fio_config = fio_config + """
936fixedbufs=1
937registerfiles=1
938#hipri=1
939"""
940        if num_jobs:
941            fio_config = fio_config + "numjobs=%s \n" % num_jobs
942        if self.cpus_allowed is not None:
943            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
944            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
945        fio_config = fio_config + filename_section
946
947        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
948        if hasattr(self, "num_cores"):
949            fio_config_filename += "_%sCPU" % self.num_cores
950        fio_config_filename += ".fio"
951
952        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
953        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
954        self.log.info("Created FIO Config:")
955        self.log.info(fio_config)
956
957        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
958
959    def set_cpu_frequency(self):
960        if self.cpu_frequency is not None:
961            try:
962                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
963                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
964                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
965            except Exception:
966                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
967                sys.exit()
968        else:
969            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
970
971    def run_fio(self, fio_config_file, run_num=1):
972        job_name, _ = os.path.splitext(fio_config_file)
973        self.log.info("Starting FIO run for job: %s" % job_name)
974        self.log.info("Using FIO: %s" % self.fio_bin)
975
976        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
977        try:
978            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
979                                    "--output=%s" % output_filename, "--eta=never"], True)
980            self.log.info(output)
981            self.log.info("FIO run finished. Results in: %s" % output_filename)
982        except subprocess.CalledProcessError as e:
983            self.log.error("ERROR: Fio process failed!")
984            self.log.error(e.stdout)
985
986    def sys_config(self):
987        self.log.info("====Kernel release:====")
988        self.log.info(self.exec_cmd(["uname", "-r"]))
989        self.log.info("====Kernel command line:====")
990        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
991        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
992        self.log.info("====sysctl conf:====")
993        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
994        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
995        self.log.info("====Cpu power info:====")
996        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
997
998
999class KernelTarget(Target):
1000    def __init__(self, name, general_config, target_config):
1001        super().__init__(name, general_config, target_config)
1002        # Defaults
1003        self.nvmet_bin = "nvmetcli"
1004
1005        if "nvmet_bin" in target_config:
1006            self.nvmet_bin = target_config["nvmet_bin"]
1007
1008    def load_drivers(self):
1009        self.log.info("Loading drivers")
1010        super().load_drivers()
1011        if self.null_block:
1012            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1013
1014    def configure_adq(self):
1015        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1016
1017    def adq_configure_tc(self):
1018        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1019
1020    def adq_set_busy_read(self, busy_read_val):
1021        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1022        return {"net.core.busy_read": 0}
1023
1024    def stop(self):
1025        self.nvmet_command(self.nvmet_bin, "clear")
1026        self.restore_settings()
1027
1028    def get_nvme_device_bdf(self, nvme_dev_path):
1029        nvme_name = os.path.basename(nvme_dev_path)
1030        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1031
1032    def get_nvme_devices(self):
1033        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1034        nvme_list = []
1035        for dev in dev_list:
1036            if "nvme" not in dev:
1037                continue
1038            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1039                continue
1040            if len(self.nvme_allowlist) == 0:
1041                nvme_list.append(dev)
1042                continue
1043            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1044                nvme_list.append(dev)
1045        return nvme_list
1046
1047    def nvmet_command(self, nvmet_bin, command):
1048        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1049
1050    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1051
1052        nvmet_cfg = {
1053            "ports": [],
1054            "hosts": [],
1055            "subsystems": [],
1056        }
1057
1058        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1059            port = str(4420 + bdev_num)
1060            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1061            serial = "SPDK00%s" % bdev_num
1062            bdev_name = nvme_list[bdev_num]
1063
1064            nvmet_cfg["subsystems"].append({
1065                "allowed_hosts": [],
1066                "attr": {
1067                    "allow_any_host": "1",
1068                    "serial": serial,
1069                    "version": "1.3"
1070                },
1071                "namespaces": [
1072                    {
1073                        "device": {
1074                            "path": bdev_name,
1075                            "uuid": "%s" % uuid.uuid4()
1076                        },
1077                        "enable": 1,
1078                        "nsid": port
1079                    }
1080                ],
1081                "nqn": nqn
1082            })
1083
1084            nvmet_cfg["ports"].append({
1085                "addr": {
1086                    "adrfam": "ipv4",
1087                    "traddr": ip,
1088                    "trsvcid": port,
1089                    "trtype": self.transport
1090                },
1091                "portid": bdev_num,
1092                "referrals": [],
1093                "subsystems": [nqn]
1094            })
1095
1096            self.subsystem_info_list.append((port, nqn, ip))
1097        self.subsys_no = len(self.subsystem_info_list)
1098
1099        with open("kernel.conf", "w") as fh:
1100            fh.write(json.dumps(nvmet_cfg, indent=2))
1101
1102    def tgt_start(self):
1103        self.log.info("Configuring kernel NVMeOF Target")
1104
1105        if self.null_block:
1106            self.log.info("Configuring with null block device.")
1107            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1108        else:
1109            self.log.info("Configuring with NVMe drives.")
1110            nvme_list = self.get_nvme_devices()
1111
1112        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1113        self.subsys_no = len(nvme_list)
1114
1115        self.nvmet_command(self.nvmet_bin, "clear")
1116        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1117
1118        if self.enable_adq:
1119            self.adq_configure_tc()
1120
1121        self.log.info("Done configuring kernel NVMeOF Target")
1122
1123
1124class SPDKTarget(Target):
1125    def __init__(self, name, general_config, target_config):
1126        # Required fields
1127        self.core_mask = target_config["core_mask"]
1128        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1129
1130        super().__init__(name, general_config, target_config)
1131
1132        # Defaults
1133        self.dif_insert_strip = False
1134        self.null_block_dif_type = 0
1135        self.num_shared_buffers = 4096
1136        self.max_queue_depth = 128
1137        self.bpf_proc = None
1138        self.bpf_scripts = []
1139        self.enable_dsa = False
1140        self.scheduler_core_limit = None
1141        self.iobuf_small_pool_count = 32767
1142        self.iobuf_large_pool_count = 16383
1143        self.num_cqe = 4096
1144
1145        if "num_shared_buffers" in target_config:
1146            self.num_shared_buffers = target_config["num_shared_buffers"]
1147        if "max_queue_depth" in target_config:
1148            self.max_queue_depth = target_config["max_queue_depth"]
1149        if "null_block_dif_type" in target_config:
1150            self.null_block_dif_type = target_config["null_block_dif_type"]
1151        if "dif_insert_strip" in target_config:
1152            self.dif_insert_strip = target_config["dif_insert_strip"]
1153        if "bpf_scripts" in target_config:
1154            self.bpf_scripts = target_config["bpf_scripts"]
1155        if "dsa_settings" in target_config:
1156            self.enable_dsa = target_config["dsa_settings"]
1157        if "scheduler_core_limit" in target_config:
1158            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1159        if "iobuf_small_pool_count" in target_config:
1160            self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"]
1161        if "iobuf_large_pool_count" in target_config:
1162            self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"]
1163        if "num_cqe" in target_config:
1164            self.num_cqe = target_config["num_cqe"]
1165
1166        self.log.info("====DSA settings:====")
1167        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1168
1169    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1170        if mode not in ["default", "bynode", "cpulist",
1171                        "shared", "split", "split-bynode"]:
1172            self.log.error("%s irq affinity setting not supported" % mode)
1173            raise Exception
1174
1175        # Create core list from SPDK's mask and change it to string.
1176        # This is the type configure_irq_affinity expects for cpulist parameter.
1177        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1178        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1179        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1180
1181        if mode == "shared":
1182            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1183        elif mode == "split":
1184            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1185        elif mode == "split-bynode":
1186            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1187        else:
1188            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1189
1190    def adq_set_busy_read(self, busy_read_val):
1191        return {"net.core.busy_read": busy_read_val}
1192
1193    def get_nvme_devices_count(self):
1194        return len(self.get_nvme_devices())
1195
1196    def get_nvme_devices(self):
1197        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1198        bdev_bdfs = []
1199        for bdev in bdev_subsys_json_obj["config"]:
1200            bdev_traddr = bdev["params"]["traddr"]
1201            if bdev_traddr in self.nvme_blocklist:
1202                continue
1203            if len(self.nvme_allowlist) == 0:
1204                bdev_bdfs.append(bdev_traddr)
1205            if bdev_traddr in self.nvme_allowlist:
1206                bdev_bdfs.append(bdev_traddr)
1207        return bdev_bdfs
1208
1209    def spdk_tgt_configure(self):
1210        self.log.info("Configuring SPDK NVMeOF target via RPC")
1211
1212        # Create transport layer
1213        nvmf_transport_params = {
1214            "client": self.client,
1215            "trtype": self.transport,
1216            "num_shared_buffers": self.num_shared_buffers,
1217            "max_queue_depth": self.max_queue_depth,
1218            "dif_insert_or_strip": self.dif_insert_strip,
1219            "sock_priority": self.adq_priority,
1220            "num_cqe": self.num_cqe
1221        }
1222
1223        if self.enable_adq:
1224            nvmf_transport_params["acceptor_poll_rate"] = 10000
1225
1226        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1227        self.log.info("SPDK NVMeOF transport layer:")
1228        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1229
1230        if self.null_block:
1231            self.spdk_tgt_add_nullblock(self.null_block)
1232            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1233        else:
1234            self.spdk_tgt_add_nvme_conf()
1235            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1236
1237        if self.enable_adq:
1238            self.adq_configure_tc()
1239
1240        self.log.info("Done configuring SPDK NVMeOF Target")
1241
1242    def spdk_tgt_add_nullblock(self, null_block_count):
1243        md_size = 0
1244        block_size = 4096
1245        if self.null_block_dif_type != 0:
1246            md_size = 128
1247
1248        self.log.info("Adding null block bdevices to config via RPC")
1249        for i in range(null_block_count):
1250            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1251            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1252                                      dif_type=self.null_block_dif_type, md_size=md_size)
1253        self.log.info("SPDK Bdevs configuration:")
1254        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1255
1256    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1257        self.log.info("Adding NVMe bdevs to config via RPC")
1258
1259        bdfs = self.get_nvme_devices()
1260        bdfs = [b.replace(":", ".") for b in bdfs]
1261
1262        if req_num_disks:
1263            if req_num_disks > len(bdfs):
1264                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1265                sys.exit(1)
1266            else:
1267                bdfs = bdfs[0:req_num_disks]
1268
1269        for i, bdf in enumerate(bdfs):
1270            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1271
1272        self.log.info("SPDK Bdevs configuration:")
1273        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1274
1275    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1276        self.log.info("Adding subsystems to config")
1277        if not req_num_disks:
1278            req_num_disks = self.get_nvme_devices_count()
1279
1280        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1281            port = str(4420 + bdev_num)
1282            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1283            serial = "SPDK00%s" % bdev_num
1284            bdev_name = "Nvme%sn1" % bdev_num
1285
1286            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1287                                           allow_any_host=True, max_namespaces=8)
1288            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1289            for nqn_name in [nqn, "discovery"]:
1290                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1291                                                     nqn=nqn_name,
1292                                                     trtype=self.transport,
1293                                                     traddr=ip,
1294                                                     trsvcid=port,
1295                                                     adrfam="ipv4")
1296            self.subsystem_info_list.append((port, nqn, ip))
1297        self.subsys_no = len(self.subsystem_info_list)
1298
1299        self.log.info("SPDK NVMeOF subsystem configuration:")
1300        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1301
1302    def bpf_start(self):
1303        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1304        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1305        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1306        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1307
1308        with open(self.pid, "r") as fh:
1309            nvmf_pid = str(fh.readline())
1310
1311        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1312        self.log.info(cmd)
1313        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1314
1315    def tgt_start(self):
1316        if self.null_block:
1317            self.subsys_no = 1
1318        else:
1319            self.subsys_no = self.get_nvme_devices_count()
1320        self.log.info("Starting SPDK NVMeOF Target process")
1321        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1322        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1323        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1324
1325        with open(self.pid, "w") as fh:
1326            fh.write(str(proc.pid))
1327        self.nvmf_proc = proc
1328        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1329        self.log.info("Waiting for spdk to initialize...")
1330        while True:
1331            if os.path.exists("/var/tmp/spdk.sock"):
1332                break
1333            time.sleep(1)
1334        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1335
1336        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1337        rpc.iobuf.iobuf_set_options(self.client,
1338                                    small_pool_count=self.iobuf_small_pool_count,
1339                                    large_pool_count=self.iobuf_large_pool_count,
1340                                    small_bufsize=None,
1341                                    large_bufsize=None)
1342
1343        if self.enable_zcopy:
1344            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1345                                           enable_zerocopy_send_server=True)
1346            self.log.info("Target socket options:")
1347            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1348
1349        if self.enable_adq:
1350            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1351            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1352                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1353
1354        if self.enable_dsa:
1355            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1356            self.log.info("Target DSA accel module enabled")
1357
1358        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1359        rpc.framework_start_init(self.client)
1360
1361        if self.bpf_scripts:
1362            self.bpf_start()
1363
1364        self.spdk_tgt_configure()
1365
1366    def stop(self):
1367        if self.bpf_proc:
1368            self.log.info("Stopping BPF Trace script")
1369            self.bpf_proc.terminate()
1370            self.bpf_proc.wait()
1371
1372        if hasattr(self, "nvmf_proc"):
1373            try:
1374                self.nvmf_proc.terminate()
1375                self.nvmf_proc.wait(timeout=30)
1376            except Exception as e:
1377                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1378                self.log.info(e)
1379                self.nvmf_proc.kill()
1380                self.nvmf_proc.communicate()
1381                # Try to clean up RPC socket files if they were not removed
1382                # because of using 'kill'
1383                try:
1384                    os.remove("/var/tmp/spdk.sock")
1385                    os.remove("/var/tmp/spdk.sock.lock")
1386                except FileNotFoundError:
1387                    pass
1388        self.restore_settings()
1389
1390
1391class KernelInitiator(Initiator):
1392    def __init__(self, name, general_config, initiator_config):
1393        super().__init__(name, general_config, initiator_config)
1394
1395        # Defaults
1396        self.extra_params = ""
1397        self.ioengine = "libaio"
1398        self.spdk_conf = ""
1399
1400        if "num_cores" in initiator_config:
1401            self.num_cores = initiator_config["num_cores"]
1402
1403        if "extra_params" in initiator_config:
1404            self.extra_params = initiator_config["extra_params"]
1405
1406        if "kernel_engine" in initiator_config:
1407            self.ioengine = initiator_config["kernel_engine"]
1408            if "io_uring" in self.ioengine:
1409                self.extra_params += ' --nr-poll-queues=8'
1410
1411    def configure_adq(self):
1412        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1413
1414    def adq_configure_tc(self):
1415        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1416
1417    def adq_set_busy_read(self, busy_read_val):
1418        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1419        return {"net.core.busy_read": 0}
1420
1421    def get_connected_nvme_list(self):
1422        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1423        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1424                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1425        return nvme_list
1426
1427    def init_connect(self):
1428        self.log.info("Below connection attempts may result in error messages, this is expected!")
1429        for subsystem in self.subsystem_info_list:
1430            self.log.info("Trying to connect %s %s %s" % subsystem)
1431            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1432                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1433            time.sleep(2)
1434
1435        if "io_uring" in self.ioengine:
1436            self.log.info("Setting block layer settings for io_uring.")
1437
1438            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1439            #       apparently it's not possible for connected subsystems.
1440            #       Results in "error: Invalid argument"
1441            block_sysfs_settings = {
1442                "iostats": "0",
1443                "rq_affinity": "0",
1444                "nomerges": "2"
1445            }
1446
1447            for disk in self.get_connected_nvme_list():
1448                sysfs = os.path.join("/sys/block", disk, "queue")
1449                for k, v in block_sysfs_settings.items():
1450                    sysfs_opt_path = os.path.join(sysfs, k)
1451                    try:
1452                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1453                    except CalledProcessError as e:
1454                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1455                    finally:
1456                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1457                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1458
1459    def init_disconnect(self):
1460        for subsystem in self.subsystem_info_list:
1461            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1462            time.sleep(1)
1463
1464    def get_nvme_subsystem_numa(self, dev_name):
1465        # Remove two last characters to get controller name instead of subsystem name
1466        nvme_ctrl = os.path.basename(dev_name)[:-2]
1467        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1468                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1469        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1470
1471    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1472        self.available_cpus = self.get_numa_cpu_map()
1473        if len(threads) >= len(subsystems):
1474            threads = range(0, len(subsystems))
1475
1476        # Generate connected nvme devices names and sort them by used NIC numa node
1477        # to allow better grouping when splitting into fio sections.
1478        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1479        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1480        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1481
1482        filename_section = ""
1483        nvme_per_split = int(len(nvme_list) / len(threads))
1484        remainder = len(nvme_list) % len(threads)
1485        iterator = iter(nvme_list)
1486        result = []
1487        for i in range(len(threads)):
1488            result.append([])
1489            for _ in range(nvme_per_split):
1490                result[i].append(next(iterator))
1491                if remainder:
1492                    result[i].append(next(iterator))
1493                    remainder -= 1
1494        for i, r in enumerate(result):
1495            header = "[filename%s]" % i
1496            disks = "\n".join(["filename=%s" % x for x in r])
1497            job_section_qd = round((io_depth * len(r)) / num_jobs)
1498            if job_section_qd == 0:
1499                job_section_qd = 1
1500            iodepth = "iodepth=%s" % job_section_qd
1501
1502            offset_section = ""
1503            if offset:
1504                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1505
1506            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1507
1508            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1509
1510        return filename_section
1511
1512
1513class SPDKInitiator(Initiator):
1514    def __init__(self, name, general_config, initiator_config):
1515        super().__init__(name, general_config, initiator_config)
1516
1517        if self.skip_spdk_install is False:
1518            self.install_spdk()
1519
1520        # Optional fields
1521        self.enable_data_digest = False
1522        if "enable_data_digest" in initiator_config:
1523            self.enable_data_digest = initiator_config["enable_data_digest"]
1524        if "num_cores" in initiator_config:
1525            self.num_cores = initiator_config["num_cores"]
1526
1527        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1528        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1529
1530    def adq_set_busy_read(self, busy_read_val):
1531        return {"net.core.busy_read": busy_read_val}
1532
1533    def install_spdk(self):
1534        self.log.info("Using fio binary %s" % self.fio_bin)
1535        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1536        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1537        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1538                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1539                       "--enable-lto", "--disable-unit-tests"])
1540        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1541        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1542
1543        self.log.info("SPDK built")
1544        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1545
1546    def init_connect(self):
1547        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1548        # bdev plugin initiates connection just before starting IO traffic.
1549        # This is just to have a "init_connect" equivalent of the same function
1550        # from KernelInitiator class.
1551        # Just prepare bdev.conf JSON file for later use and consider it
1552        # "making a connection".
1553        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1554        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1555
1556    def init_disconnect(self):
1557        # SPDK Initiator does not need to explicity disconnect as this gets done
1558        # after fio bdev plugin finishes IO.
1559        pass
1560
1561    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1562        bdev_cfg_section = {
1563            "subsystems": [
1564                {
1565                    "subsystem": "bdev",
1566                    "config": []
1567                }
1568            ]
1569        }
1570
1571        for i, subsys in enumerate(remote_subsystem_list):
1572            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1573            nvme_ctrl = {
1574                "method": "bdev_nvme_attach_controller",
1575                "params": {
1576                    "name": "Nvme{}".format(i),
1577                    "trtype": self.transport,
1578                    "traddr": sub_addr,
1579                    "trsvcid": sub_port,
1580                    "subnqn": sub_nqn,
1581                    "adrfam": "IPv4"
1582                }
1583            }
1584
1585            if self.enable_adq:
1586                nvme_ctrl["params"].update({"priority": "1"})
1587
1588            if self.enable_data_digest:
1589                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1590
1591            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1592
1593        return json.dumps(bdev_cfg_section, indent=2)
1594
1595    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1596        self.available_cpus = self.get_numa_cpu_map()
1597        filename_section = ""
1598        if len(threads) >= len(subsystems):
1599            threads = range(0, len(subsystems))
1600
1601        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1602        # to allow better grouping when splitting into fio sections.
1603        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1604        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1605        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1606
1607        nvme_per_split = int(len(subsystems) / len(threads))
1608        remainder = len(subsystems) % len(threads)
1609        iterator = iter(filenames)
1610        result = []
1611        for i in range(len(threads)):
1612            result.append([])
1613            for _ in range(nvme_per_split):
1614                result[i].append(next(iterator))
1615            if remainder:
1616                result[i].append(next(iterator))
1617                remainder -= 1
1618        for i, r in enumerate(result):
1619            header = "[filename%s]" % i
1620            disks = "\n".join(["filename=%s" % x for x in r])
1621            job_section_qd = round((io_depth * len(r)) / num_jobs)
1622            if job_section_qd == 0:
1623                job_section_qd = 1
1624            iodepth = "iodepth=%s" % job_section_qd
1625
1626            offset_section = ""
1627            if offset:
1628                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1629
1630            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1631
1632            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1633
1634        return filename_section
1635
1636    def get_nvme_subsystem_numa(self, bdev_name):
1637        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1638        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1639
1640        # Remove two last characters to get controller name instead of subsystem name
1641        nvme_ctrl = bdev_name[:-2]
1642        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1643        return self.get_route_nic_numa(remote_nvme_ip)
1644
1645
1646if __name__ == "__main__":
1647    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1648    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1649
1650    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1651    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1652                        help='Configuration file.')
1653    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1654                        help='Results directory.')
1655    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1656                        help='CSV results filename.')
1657    parser.add_argument('-f', '--force', default=False, action='store_true',
1658                        dest='force', help="""Force script to continue and try to use all
1659                        available NVMe devices during test.
1660                        WARNING: Might result in data loss on used NVMe drives""")
1661
1662    args = parser.parse_args()
1663
1664    logging.basicConfig(level=logging.INFO,
1665                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1666
1667    logging.info("Using config file: %s" % args.config)
1668    with open(args.config, "r") as config:
1669        data = json.load(config)
1670
1671    initiators = []
1672    fio_cases = []
1673
1674    general_config = data["general"]
1675    target_config = data["target"]
1676    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1677
1678    if "null_block_devices" not in data["target"] and \
1679        (args.force is False and
1680            "allowlist" not in data["target"] and
1681            "blocklist" not in data["target"]):
1682        # TODO: Also check if allowlist or blocklist are not empty.
1683        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1684        You can choose to use all available NVMe drives on your system, which may potentially
1685        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1686        exit(1)
1687
1688    for k, v in data.items():
1689        if "target" in k:
1690            v.update({"results_dir": args.results})
1691            if data[k]["mode"] == "spdk":
1692                target_obj = SPDKTarget(k, data["general"], v)
1693            elif data[k]["mode"] == "kernel":
1694                target_obj = KernelTarget(k, data["general"], v)
1695        elif "initiator" in k:
1696            if data[k]["mode"] == "spdk":
1697                init_obj = SPDKInitiator(k, data["general"], v)
1698            elif data[k]["mode"] == "kernel":
1699                init_obj = KernelInitiator(k, data["general"], v)
1700            initiators.append(init_obj)
1701        elif "fio" in k:
1702            fio_workloads = itertools.product(data[k]["bs"],
1703                                              data[k]["qd"],
1704                                              data[k]["rw"])
1705
1706            fio_run_time = data[k]["run_time"]
1707            fio_ramp_time = data[k]["ramp_time"]
1708            fio_rw_mix_read = data[k]["rwmixread"]
1709            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1710            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1711
1712            fio_rate_iops = 0
1713            if "rate_iops" in data[k]:
1714                fio_rate_iops = data[k]["rate_iops"]
1715
1716            fio_offset = False
1717            if "offset" in data[k]:
1718                fio_offset = data[k]["offset"]
1719            fio_offset_inc = 0
1720            if "offset_inc" in data[k]:
1721                fio_offset_inc = data[k]["offset_inc"]
1722        else:
1723            continue
1724
1725    try:
1726        os.mkdir(args.results)
1727    except FileExistsError:
1728        pass
1729
1730    for i in initiators:
1731        target_obj.initiator_info.append(
1732            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1733        )
1734
1735    # TODO: This try block is definietly too large. Need to break this up into separate
1736    # logical blocks to reduce size.
1737    try:
1738        target_obj.tgt_start()
1739
1740        for i in initiators:
1741            i.match_subsystems(target_obj.subsystem_info_list)
1742            if i.enable_adq:
1743                i.adq_configure_tc()
1744
1745        # Poor mans threading
1746        # Run FIO tests
1747        for block_size, io_depth, rw in fio_workloads:
1748            configs = []
1749            for i in initiators:
1750                i.init_connect()
1751                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1752                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1753                                       fio_offset, fio_offset_inc)
1754                configs.append(cfg)
1755
1756            for run_no in range(1, fio_run_num+1):
1757                threads = []
1758                power_daemon = None
1759                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1760
1761                for i, cfg in zip(initiators, configs):
1762                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1763                    threads.append(t)
1764                if target_obj.enable_sar:
1765                    sar_file_prefix = measurements_prefix + "_sar"
1766                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1767                    threads.append(t)
1768
1769                if target_obj.enable_pcm:
1770                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1771
1772                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1773                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1774                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1775                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1776                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1777                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1778
1779                    threads.append(pcm_cpu_t)
1780                    threads.append(pcm_mem_t)
1781                    threads.append(pcm_pow_t)
1782
1783                if target_obj.enable_bw:
1784                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1785                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1786                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1787                    threads.append(t)
1788
1789                if target_obj.enable_dpdk_memory:
1790                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1791                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1792                    threads.append(t)
1793
1794                if target_obj.enable_pm:
1795                    power_daemon = threading.Thread(target=target_obj.measure_power,
1796                                                    args=(args.results, measurements_prefix, script_full_dir,
1797                                                          fio_ramp_time, fio_run_time))
1798                    threads.append(power_daemon)
1799
1800                for t in threads:
1801                    t.start()
1802                for t in threads:
1803                    t.join()
1804
1805            for i in initiators:
1806                i.init_disconnect()
1807                i.copy_result_files(args.results)
1808        try:
1809            parse_results(args.results, args.csv_filename)
1810        except Exception as err:
1811            logging.error("There was an error with parsing the results")
1812            logging.error(err)
1813    finally:
1814        for i in initiators:
1815            try:
1816                i.stop()
1817            except Exception as err:
1818                pass
1819        target_obj.stop()
1820