xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision f6866117acb32c78d5ea7bd76ba330284655af35)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.nic_ips = server_config["nic_ips"]
39        self.mode = server_config["mode"]
40
41        self.log = logging.getLogger(self.name)
42
43        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
44        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
45            self.irq_scripts_dir = server_config["irq_scripts_dir"]
46
47        self.local_nic_info = []
48        self._nics_json_obj = {}
49        self.svc_restore_dict = {}
50        self.sysctl_restore_dict = {}
51        self.tuned_restore_dict = {}
52        self.governor_restore = ""
53        self.tuned_profile = ""
54
55        self.enable_adq = False
56        self.adq_priority = None
57        self.irq_settings = {"mode": "default"}
58
59        if "adq_enable" in server_config and server_config["adq_enable"]:
60            self.enable_adq = server_config["adq_enable"]
61            self.adq_priority = 1
62        if "irq_settings" in server_config:
63            self.irq_settings.update(server_config["irq_settings"])
64        if "tuned_profile" in server_config:
65            self.tuned_profile = server_config["tuned_profile"]
66
67        if not re.match(r'^[A-Za-z0-9\-]+$', name):
68            self.log.info("Please use a name which contains only letters, numbers or dashes")
69            sys.exit(1)
70
71    @staticmethod
72    def get_uncommented_lines(lines):
73        return [line for line in lines if line and not line.startswith('#')]
74
75    def get_nic_name_by_ip(self, ip):
76        if not self._nics_json_obj:
77            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
78            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
79        for nic in self._nics_json_obj:
80            for addr in nic["addr_info"]:
81                if ip in addr["local"]:
82                    return nic["ifname"]
83
84    def set_local_nic_info_helper(self):
85        pass
86
87    def set_local_nic_info(self, pci_info):
88        def extract_network_elements(json_obj):
89            nic_list = []
90            if isinstance(json_obj, list):
91                for x in json_obj:
92                    nic_list.extend(extract_network_elements(x))
93            elif isinstance(json_obj, dict):
94                if "children" in json_obj:
95                    nic_list.extend(extract_network_elements(json_obj["children"]))
96                if "class" in json_obj.keys() and "network" in json_obj["class"]:
97                    nic_list.append(json_obj)
98            return nic_list
99
100        self.local_nic_info = extract_network_elements(pci_info)
101
102    def get_nic_numa_node(self, nic_name):
103        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
104
105    def get_numa_cpu_map(self):
106        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
107        numa_cpu_json_map = {}
108
109        for cpu in numa_cpu_json_obj["cpus"]:
110            cpu_num = int(cpu["cpu"])
111            numa_node = int(cpu["node"])
112            numa_cpu_json_map.setdefault(numa_node, [])
113            numa_cpu_json_map[numa_node].append(cpu_num)
114
115        return numa_cpu_json_map
116
117    # pylint: disable=R0201
118    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
119        return ""
120
121    def configure_system(self):
122        self.load_drivers()
123        self.configure_services()
124        self.configure_sysctl()
125        self.configure_tuned()
126        self.configure_cpu_governor()
127        self.configure_irq_affinity(**self.irq_settings)
128
129    def load_drivers(self):
130        self.log.info("Loading drivers")
131        self.exec_cmd(["sudo", "modprobe", "-a",
132                       "nvme-%s" % self.transport,
133                       "nvmet-%s" % self.transport])
134
135    def configure_adq(self):
136        self.adq_load_modules()
137        self.adq_configure_nic()
138
139    def adq_load_modules(self):
140        self.log.info("Modprobing ADQ-related Linux modules...")
141        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
142        for module in adq_module_deps:
143            try:
144                self.exec_cmd(["sudo", "modprobe", module])
145                self.log.info("%s loaded!" % module)
146            except CalledProcessError as e:
147                self.log.error("ERROR: failed to load module %s" % module)
148                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
149
150    def adq_configure_tc(self):
151        self.log.info("Configuring ADQ Traffic classes and filters...")
152
153        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
154        num_queues_tc1 = self.num_cores
155        port_param = "dst_port" if isinstance(self, Target) else "src_port"
156        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
157
158        for nic_ip in self.nic_ips:
159            nic_name = self.get_nic_name_by_ip(nic_ip)
160            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
161
162            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
163                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
164                                "queues", "%s@0" % num_queues_tc0,
165                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
166                                "hw", "1", "mode", "channel"]
167            self.log.info(" ".join(tc_qdisc_map_cmd))
168            self.exec_cmd(tc_qdisc_map_cmd)
169
170            time.sleep(5)
171            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
172            self.log.info(" ".join(tc_qdisc_ingress_cmd))
173            self.exec_cmd(tc_qdisc_ingress_cmd)
174
175            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
176            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
177                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
178
179            for port in nic_ports:
180                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
181                                 "protocol", "ip", "ingress", "prio", "1", "flower",
182                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
183                                 "skip_sw", "hw_tc", "1"]
184                self.log.info(" ".join(tc_filter_cmd))
185                self.exec_cmd(tc_filter_cmd)
186
187            # show tc configuration
188            self.log.info("Show tc configuration for %s NIC..." % nic_name)
189            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
190            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
191            self.log.info("%s" % tc_disk_out)
192            self.log.info("%s" % tc_filter_out)
193
194            # Ethtool coalesce settings must be applied after configuring traffic classes
195            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
196            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
197
198            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
199            xps_cmd = ["sudo", xps_script_path, nic_name]
200            self.log.info(xps_cmd)
201            self.exec_cmd(xps_cmd)
202
203    def reload_driver(self, driver):
204        try:
205            self.exec_cmd(["sudo", "rmmod", driver])
206            self.exec_cmd(["sudo", "modprobe", driver])
207        except CalledProcessError as e:
208            self.log.error("ERROR: failed to reload %s module!" % driver)
209            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
210
211    def adq_configure_nic(self):
212        self.log.info("Configuring NIC port settings for ADQ testing...")
213
214        # Reload the driver first, to make sure any previous settings are re-set.
215        self.reload_driver("ice")
216
217        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
218        for nic in nic_names:
219            self.log.info(nic)
220            try:
221                self.exec_cmd(["sudo", "ethtool", "-K", nic,
222                               "hw-tc-offload", "on"])  # Enable hardware TC offload
223                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
224                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
225                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
226                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
227                # but can be used with 4k block sizes as well
228                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
229                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
230            except subprocess.CalledProcessError as e:
231                self.log.error("ERROR: failed to configure NIC port using ethtool!")
232                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
233                self.log.info("Please update your NIC driver and firmware versions and try again.")
234            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
235            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
236
237    def configure_services(self):
238        self.log.info("Configuring active services...")
239        svc_config = configparser.ConfigParser(strict=False)
240
241        # Below list is valid only for RHEL / Fedora systems and might not
242        # contain valid names for other distributions.
243        svc_target_state = {
244            "firewalld": "inactive",
245            "irqbalance": "inactive",
246            "lldpad.service": "inactive",
247            "lldpad.socket": "inactive"
248        }
249
250        for service in svc_target_state:
251            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
252            out = "\n".join(["[%s]" % service, out])
253            svc_config.read_string(out)
254
255            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
256                continue
257
258            service_state = svc_config[service]["ActiveState"]
259            self.log.info("Current state of %s service is %s" % (service, service_state))
260            self.svc_restore_dict.update({service: service_state})
261            if service_state != "inactive":
262                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
263                self.exec_cmd(["sudo", "systemctl", "stop", service])
264
265    def configure_sysctl(self):
266        self.log.info("Tuning sysctl settings...")
267
268        busy_read = 0
269        busy_poll = 0
270        if self.enable_adq and self.mode == "spdk":
271            busy_read = 1
272            busy_poll = 1
273
274        sysctl_opts = {
275            "net.core.busy_poll": busy_poll,
276            "net.core.busy_read": busy_read,
277            "net.core.somaxconn": 4096,
278            "net.core.netdev_max_backlog": 8192,
279            "net.ipv4.tcp_max_syn_backlog": 16384,
280            "net.core.rmem_max": 268435456,
281            "net.core.wmem_max": 268435456,
282            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
283            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
284            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
285            "net.ipv4.route.flush": 1,
286            "vm.overcommit_memory": 1,
287        }
288
289        if self.enable_adq:
290            sysctl_opts.update(self.adq_set_busy_read(1))
291
292        for opt, value in sysctl_opts.items():
293            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
294            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
295
296    def configure_tuned(self):
297        if not self.tuned_profile:
298            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
299            return
300
301        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
302        service = "tuned"
303        tuned_config = configparser.ConfigParser(strict=False)
304
305        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
306        out = "\n".join(["[%s]" % service, out])
307        tuned_config.read_string(out)
308        tuned_state = tuned_config[service]["ActiveState"]
309        self.svc_restore_dict.update({service: tuned_state})
310
311        if tuned_state != "inactive":
312            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
313            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
314
315            self.tuned_restore_dict = {
316                "profile": profile,
317                "mode": profile_mode
318            }
319
320        self.exec_cmd(["sudo", "systemctl", "start", service])
321        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
322        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
323
324    def configure_cpu_governor(self):
325        self.log.info("Setting CPU governor to performance...")
326
327        # This assumes that there is the same CPU scaling governor on each CPU
328        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
329        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
330
331    def get_core_list_from_mask(self, core_mask):
332        # Generate list of individual cores from hex core mask
333        # (e.g. '0xffff') or list containing:
334        # - individual cores (e.g. '1, 2, 3')
335        # - core ranges (e.g. '0-3')
336        # - mix of both (e.g. '0, 1-4, 9, 11-13')
337
338        core_list = []
339        if "0x" in core_mask:
340            core_mask_int = int(core_mask, 16)
341            for i in range(core_mask_int.bit_length()):
342                if (1 << i) & core_mask_int:
343                    core_list.append(i)
344            return core_list
345        else:
346            # Core list can be provided in .json config with square brackets
347            # remove them first
348            core_mask = core_mask.replace("[", "")
349            core_mask = core_mask.replace("]", "")
350
351            for i in core_mask.split(","):
352                if "-" in i:
353                    start, end = i.split("-")
354                    core_range = range(int(start), int(end) + 1)
355                    core_list.extend(core_range)
356                else:
357                    core_list.append(int(i))
358            return core_list
359
360    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
361        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
362
363        if mode not in ["default", "bynode", "cpulist"]:
364            raise ValueError("%s irq affinity setting not supported" % mode)
365
366        if mode == "cpulist" and not cpulist:
367            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
368
369        affinity_script = "set_irq_affinity.sh"
370        if "default" not in mode:
371            affinity_script = "set_irq_affinity_cpulist.sh"
372            system_cpu_map = self.get_numa_cpu_map()
373        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
374
375        def cpu_list_to_string(cpulist):
376            return ",".join(map(lambda x: str(x), cpulist))
377
378        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
379        for nic_name in nic_names:
380            irq_cmd = ["sudo", irq_script_path]
381
382            # Use only CPU cores matching NIC NUMA node.
383            # Remove any CPU cores if they're on exclusion list.
384            if mode == "bynode":
385                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
386                if cpulist and exclude_cpulist:
387                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
388                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
389                    if not irq_cpus:
390                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
391                irq_cmd.append(cpu_list_to_string(irq_cpus))
392
393            if mode == "cpulist":
394                irq_cpus = self.get_core_list_from_mask(cpulist)
395                if exclude_cpulist:
396                    # Flatten system CPU list, we don't need NUMA awareness here
397                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
398                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
399                    if not irq_cpus:
400                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
401                irq_cmd.append(cpu_list_to_string(irq_cpus))
402
403            irq_cmd.append(nic_name)
404            self.log.info(irq_cmd)
405            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
406
407    def restore_services(self):
408        self.log.info("Restoring services...")
409        for service, state in self.svc_restore_dict.items():
410            cmd = "stop" if state == "inactive" else "start"
411            self.exec_cmd(["sudo", "systemctl", cmd, service])
412
413    def restore_sysctl(self):
414        self.log.info("Restoring sysctl settings...")
415        for opt, value in self.sysctl_restore_dict.items():
416            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
417
418    def restore_tuned(self):
419        self.log.info("Restoring tuned-adm settings...")
420
421        if not self.tuned_restore_dict:
422            return
423
424        if self.tuned_restore_dict["mode"] == "auto":
425            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
426            self.log.info("Reverted tuned-adm to auto_profile.")
427        else:
428            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
429            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
430
431    def restore_governor(self):
432        self.log.info("Restoring CPU governor setting...")
433        if self.governor_restore:
434            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
435            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
436
437    def restore_settings(self):
438        self.restore_governor()
439        self.restore_tuned()
440        self.restore_services()
441        self.restore_sysctl()
442        if self.enable_adq:
443            self.reload_driver("ice")
444
445
446class Target(Server):
447    def __init__(self, name, general_config, target_config):
448        super().__init__(name, general_config, target_config)
449
450        # Defaults
451        self.enable_zcopy = False
452        self.scheduler_name = "static"
453        self.null_block = 0
454        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
455        self.subsystem_info_list = []
456        self.initiator_info = []
457        self.nvme_allowlist = []
458        self.nvme_blocklist = []
459
460        # Target-side measurement options
461        self.enable_pm = True
462        self.enable_sar = True
463        self.enable_pcm = True
464        self.enable_bw = True
465        self.enable_dpdk_memory = True
466
467        if "null_block_devices" in target_config:
468            self.null_block = target_config["null_block_devices"]
469        if "scheduler_settings" in target_config:
470            self.scheduler_name = target_config["scheduler_settings"]
471        if "zcopy_settings" in target_config:
472            self.enable_zcopy = target_config["zcopy_settings"]
473        if "results_dir" in target_config:
474            self.results_dir = target_config["results_dir"]
475        if "blocklist" in target_config:
476            self.nvme_blocklist = target_config["blocklist"]
477        if "allowlist" in target_config:
478            self.nvme_allowlist = target_config["allowlist"]
479            # Blocklist takes precedence, remove common elements from allowlist
480            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
481        if "enable_pm" in target_config:
482            self.enable_pm = target_config["enable_pm"]
483        if "enable_sar" in target_config:
484            self.enable_sar = target_config["enable_sar"]
485        if "enable_pcm" in target_config:
486            self.enable_pcm = target_config["enable_pcm"]
487        if "enable_bandwidth" in target_config:
488            self.enable_bw = target_config["enable_bandwidth"]
489        if "enable_dpdk_memory" in target_config:
490            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
491
492        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
493        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
494
495        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
496        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
497        self.set_local_nic_info(self.set_local_nic_info_helper())
498
499        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
500            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
501
502        self.configure_system()
503        if self.enable_adq:
504            self.configure_adq()
505            self.configure_irq_affinity()
506        self.sys_config()
507
508    def set_local_nic_info_helper(self):
509        return json.loads(self.exec_cmd(["lshw", "-json"]))
510
511    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
512        stderr_opt = None
513        if stderr_redirect:
514            stderr_opt = subprocess.STDOUT
515        if change_dir:
516            old_cwd = os.getcwd()
517            os.chdir(change_dir)
518            self.log.info("Changing directory to %s" % change_dir)
519
520        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
521
522        if change_dir:
523            os.chdir(old_cwd)
524            self.log.info("Changing directory to %s" % old_cwd)
525        return out
526
527    def zip_spdk_sources(self, spdk_dir, dest_file):
528        self.log.info("Zipping SPDK source directory")
529        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
530        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
531            for file in files:
532                fh.write(os.path.relpath(os.path.join(root, file)))
533        fh.close()
534        self.log.info("Done zipping")
535
536    @staticmethod
537    def _chunks(input_list, chunks_no):
538        div, rem = divmod(len(input_list), chunks_no)
539        for i in range(chunks_no):
540            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
541            yield input_list[si:si + (div + 1 if i < rem else div)]
542
543    def spread_bdevs(self, req_disks):
544        # Spread available block devices indexes:
545        # - evenly across available initiator systems
546        # - evenly across available NIC interfaces for
547        #   each initiator
548        # Not NUMA aware.
549        ip_bdev_map = []
550        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
551
552        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
553            self.initiator_info[i]["bdev_range"] = init_chunk
554            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
555            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
556                for c in nic_chunk:
557                    ip_bdev_map.append((ip, c))
558        return ip_bdev_map
559
560    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
561        cpu_number = os.cpu_count()
562        sar_idle_sum = 0
563        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
564        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
565
566        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
567        time.sleep(ramp_time)
568        self.log.info("Starting SAR measurements")
569
570        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
571        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
572            for line in out.split("\n"):
573                if "Average" in line:
574                    if "CPU" in line:
575                        self.log.info("Summary CPU utilization from SAR:")
576                        self.log.info(line)
577                    elif "all" in line:
578                        self.log.info(line)
579                    else:
580                        sar_idle_sum += float(line.split()[7])
581            fh.write(out)
582        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
583
584        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
585            f.write("%0.2f" % sar_cpu_usage)
586
587    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
588        time.sleep(ramp_time)
589        self.log.info("Starting power measurements")
590        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
591                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
592                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
593
594    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
595        time.sleep(ramp_time)
596        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
597        pcm_memory = subprocess.Popen(cmd)
598        time.sleep(run_time)
599        pcm_memory.terminate()
600
601    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
602        time.sleep(ramp_time)
603        cmd = ["pcm", "1", "-i=%s" % run_time,
604               "-csv=%s/%s" % (results_dir, pcm_file_name)]
605        subprocess.run(cmd)
606        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
607        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
608        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
609        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
610        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
611
612    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
613        time.sleep(ramp_time)
614        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
615        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
616            fh.write(out)
617        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
618        #       Improve this so that additional file containing measurements average is generated too.
619
620    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
621        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
622        time.sleep(ramp_time)
623        self.log.info("INFO: starting network bandwidth measure")
624        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
625                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
626        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
627        #       Improve this so that additional file containing measurements average is generated too.
628        # TODO: Monitor only these interfaces which are currently used to run the workload.
629
630    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
631        self.log.info("INFO: waiting to generate DPDK memory usage")
632        time.sleep(ramp_time)
633        self.log.info("INFO: generating DPDK memory usage")
634        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
635        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
636
637    def sys_config(self):
638        self.log.info("====Kernel release:====")
639        self.log.info(os.uname().release)
640        self.log.info("====Kernel command line:====")
641        with open('/proc/cmdline') as f:
642            cmdline = f.readlines()
643            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
644        self.log.info("====sysctl conf:====")
645        with open('/etc/sysctl.conf') as f:
646            sysctl = f.readlines()
647            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
648        self.log.info("====Cpu power info:====")
649        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
650        self.log.info("====zcopy settings:====")
651        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
652        self.log.info("====Scheduler settings:====")
653        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
654
655
656class Initiator(Server):
657    def __init__(self, name, general_config, initiator_config):
658        super().__init__(name, general_config, initiator_config)
659
660        # Required fields
661        self.ip = initiator_config["ip"]
662        self.target_nic_ips = initiator_config["target_nic_ips"]
663
664        # Defaults
665        self.cpus_allowed = None
666        self.cpus_allowed_policy = "shared"
667        self.spdk_dir = "/tmp/spdk"
668        self.fio_bin = "/usr/src/fio/fio"
669        self.nvmecli_bin = "nvme"
670        self.cpu_frequency = None
671        self.subsystem_info_list = []
672        self.allow_cpu_sharing = True
673
674        if "spdk_dir" in initiator_config:
675            self.spdk_dir = initiator_config["spdk_dir"]
676        if "fio_bin" in initiator_config:
677            self.fio_bin = initiator_config["fio_bin"]
678        if "nvmecli_bin" in initiator_config:
679            self.nvmecli_bin = initiator_config["nvmecli_bin"]
680        if "cpus_allowed" in initiator_config:
681            self.cpus_allowed = initiator_config["cpus_allowed"]
682        if "cpus_allowed_policy" in initiator_config:
683            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
684        if "cpu_frequency" in initiator_config:
685            self.cpu_frequency = initiator_config["cpu_frequency"]
686        if "allow_cpu_sharing" in initiator_config:
687            self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"]
688        if os.getenv('SPDK_WORKSPACE'):
689            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
690
691        self.ssh_connection = paramiko.SSHClient()
692        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
693        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
694        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
695        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
696        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
697
698        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
699            self.copy_spdk("/tmp/spdk.zip")
700        self.set_local_nic_info(self.set_local_nic_info_helper())
701        self.set_cpu_frequency()
702        self.configure_system()
703        if self.enable_adq:
704            self.configure_adq()
705        self.sys_config()
706
707    def set_local_nic_info_helper(self):
708        return json.loads(self.exec_cmd(["lshw", "-json"]))
709
710    def stop(self):
711        self.restore_settings()
712        self.ssh_connection.close()
713
714    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
715        if change_dir:
716            cmd = ["cd", change_dir, ";", *cmd]
717
718        # In case one of the command elements contains whitespace and is not
719        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
720        # when sending to remote system.
721        for i, c in enumerate(cmd):
722            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
723                cmd[i] = '"%s"' % c
724        cmd = " ".join(cmd)
725
726        # Redirect stderr to stdout thanks using get_pty option if needed
727        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
728        out = stdout.read().decode(encoding="utf-8")
729
730        # Check the return code
731        rc = stdout.channel.recv_exit_status()
732        if rc:
733            raise CalledProcessError(int(rc), cmd, out)
734
735        return out
736
737    def put_file(self, local, remote_dest):
738        ftp = self.ssh_connection.open_sftp()
739        ftp.put(local, remote_dest)
740        ftp.close()
741
742    def get_file(self, remote, local_dest):
743        ftp = self.ssh_connection.open_sftp()
744        ftp.get(remote, local_dest)
745        ftp.close()
746
747    def copy_spdk(self, local_spdk_zip):
748        self.log.info("Copying SPDK sources to initiator %s" % self.name)
749        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
750        self.log.info("Copied sources zip from target")
751        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
752        self.log.info("Sources unpacked")
753
754    def copy_result_files(self, dest_dir):
755        self.log.info("Copying results")
756
757        if not os.path.exists(dest_dir):
758            os.mkdir(dest_dir)
759
760        # Get list of result files from initiator and copy them back to target
761        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
762
763        for file in file_list:
764            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
765                          os.path.join(dest_dir, file))
766        self.log.info("Done copying results")
767
768    def match_subsystems(self, target_subsytems):
769        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
770        subsystems.sort(key=lambda x: x[1])
771        self.log.info("Found matching subsystems on target side:")
772        for s in subsystems:
773            self.log.info(s)
774        self.subsystem_info_list = subsystems
775
776    def gen_fio_filename_conf(self, *args, **kwargs):
777        # Logic implemented in SPDKInitiator and KernelInitiator classes
778        pass
779
780    def get_route_nic_numa(self, remote_nvme_ip):
781        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
782        local_nvme_nic = local_nvme_nic[0]["dev"]
783        return self.get_nic_numa_node(local_nvme_nic)
784
785    @staticmethod
786    def gen_fio_offset_section(offset_inc, num_jobs):
787        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
788        return "\n".join(["size=%s%%" % offset_inc,
789                          "offset=0%",
790                          "offset_increment=%s%%" % offset_inc])
791
792    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
793        numa_stats = {}
794        allowed_cpus = []
795        for nvme in fio_filenames_list:
796            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
797            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
798
799        # Use the most common NUMA node for this chunk to allocate memory and CPUs
800        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
801
802        # Check if we have enough free CPUs to pop from the list before assigning them
803        if len(self.available_cpus[section_local_numa]) < num_jobs:
804            if self.allow_cpu_sharing:
805                self.log.info("Regenerating available CPU list %s" % section_local_numa)
806                # Remove still available CPUs from the regenerated list. We don't want to
807                # regenerate it with duplicates.
808                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
809                self.available_cpus[section_local_numa].extend(cpus_regen)
810                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
811            else:
812                self.log.error("No more free CPU cores to use from allowed_cpus list!")
813                raise IndexError
814
815        for _ in range(num_jobs):
816            try:
817                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
818            except IndexError:
819                self.log.error("No more free CPU cores to use from allowed_cpus list!")
820                raise
821
822        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
823                          "numa_mem_policy=prefer:%s" % section_local_numa])
824
825    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
826                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
827                       offset=False, offset_inc=0):
828        fio_conf_template = """
829[global]
830ioengine={ioengine}
831{spdk_conf}
832thread=1
833group_reporting=1
834direct=1
835percentile_list=50:90:99:99.5:99.9:99.99:99.999
836
837norandommap=1
838rw={rw}
839rwmixread={rwmixread}
840bs={block_size}
841time_based=1
842ramp_time={ramp_time}
843runtime={run_time}
844rate_iops={rate_iops}
845"""
846
847        if self.cpus_allowed is not None:
848            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
849            cpus_num = 0
850            cpus = self.cpus_allowed.split(",")
851            for cpu in cpus:
852                if "-" in cpu:
853                    a, b = cpu.split("-")
854                    a = int(a)
855                    b = int(b)
856                    cpus_num += len(range(a, b))
857                else:
858                    cpus_num += 1
859            self.num_cores = cpus_num
860            threads = range(0, self.num_cores)
861        elif hasattr(self, 'num_cores'):
862            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
863            threads = range(0, int(self.num_cores))
864        else:
865            self.num_cores = len(self.subsystem_info_list)
866            threads = range(0, len(self.subsystem_info_list))
867
868        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
869                                                      offset, offset_inc)
870
871        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
872                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
873                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
874
875        # TODO: hipri disabled for now, as it causes fio errors:
876        # io_u error on file /dev/nvme2n1: Operation not supported
877        # See comment in KernelInitiator class, init_connect() function
878        if "io_uring" in self.ioengine:
879            fio_config = fio_config + """
880fixedbufs=1
881registerfiles=1
882#hipri=1
883"""
884        if num_jobs:
885            fio_config = fio_config + "numjobs=%s \n" % num_jobs
886        if self.cpus_allowed is not None:
887            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
888            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
889        fio_config = fio_config + filename_section
890
891        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
892        if hasattr(self, "num_cores"):
893            fio_config_filename += "_%sCPU" % self.num_cores
894        fio_config_filename += ".fio"
895
896        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
897        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
898        self.log.info("Created FIO Config:")
899        self.log.info(fio_config)
900
901        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
902
903    def set_cpu_frequency(self):
904        if self.cpu_frequency is not None:
905            try:
906                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
907                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
908                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
909            except Exception:
910                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
911                sys.exit()
912        else:
913            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
914
915    def run_fio(self, fio_config_file, run_num=1):
916        job_name, _ = os.path.splitext(fio_config_file)
917        self.log.info("Starting FIO run for job: %s" % job_name)
918        self.log.info("Using FIO: %s" % self.fio_bin)
919
920        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
921        try:
922            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
923                                    "--output=%s" % output_filename, "--eta=never"], True)
924            self.log.info(output)
925            self.log.info("FIO run finished. Results in: %s" % output_filename)
926        except subprocess.CalledProcessError as e:
927            self.log.error("ERROR: Fio process failed!")
928            self.log.error(e.stdout)
929
930    def sys_config(self):
931        self.log.info("====Kernel release:====")
932        self.log.info(self.exec_cmd(["uname", "-r"]))
933        self.log.info("====Kernel command line:====")
934        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
935        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
936        self.log.info("====sysctl conf:====")
937        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
938        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
939        self.log.info("====Cpu power info:====")
940        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
941
942
943class KernelTarget(Target):
944    def __init__(self, name, general_config, target_config):
945        super().__init__(name, general_config, target_config)
946        # Defaults
947        self.nvmet_bin = "nvmetcli"
948
949        if "nvmet_bin" in target_config:
950            self.nvmet_bin = target_config["nvmet_bin"]
951
952    def load_drivers(self):
953        self.log.info("Loading drivers")
954        super().load_drivers()
955        if self.null_block:
956            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
957
958    def configure_adq(self):
959        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
960
961    def adq_configure_tc(self):
962        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
963
964    def adq_set_busy_read(self, busy_read_val):
965        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
966        return {"net.core.busy_read": 0}
967
968    def stop(self):
969        self.nvmet_command(self.nvmet_bin, "clear")
970        self.restore_settings()
971
972    def get_nvme_device_bdf(self, nvme_dev_path):
973        nvme_name = os.path.basename(nvme_dev_path)
974        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
975
976    def get_nvme_devices(self):
977        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
978        nvme_list = []
979        for dev in dev_list:
980            if "nvme" not in dev:
981                continue
982            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
983                continue
984            if len(self.nvme_allowlist) == 0:
985                nvme_list.append(dev)
986                continue
987            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
988                nvme_list.append(dev)
989        return nvme_list
990
991    def nvmet_command(self, nvmet_bin, command):
992        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
993
994    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
995
996        nvmet_cfg = {
997            "ports": [],
998            "hosts": [],
999            "subsystems": [],
1000        }
1001
1002        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1003            port = str(4420 + bdev_num)
1004            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1005            serial = "SPDK00%s" % bdev_num
1006            bdev_name = nvme_list[bdev_num]
1007
1008            nvmet_cfg["subsystems"].append({
1009                "allowed_hosts": [],
1010                "attr": {
1011                    "allow_any_host": "1",
1012                    "serial": serial,
1013                    "version": "1.3"
1014                },
1015                "namespaces": [
1016                    {
1017                        "device": {
1018                            "path": bdev_name,
1019                            "uuid": "%s" % uuid.uuid4()
1020                        },
1021                        "enable": 1,
1022                        "nsid": port
1023                    }
1024                ],
1025                "nqn": nqn
1026            })
1027
1028            nvmet_cfg["ports"].append({
1029                "addr": {
1030                    "adrfam": "ipv4",
1031                    "traddr": ip,
1032                    "trsvcid": port,
1033                    "trtype": self.transport
1034                },
1035                "portid": bdev_num,
1036                "referrals": [],
1037                "subsystems": [nqn]
1038            })
1039
1040            self.subsystem_info_list.append((port, nqn, ip))
1041        self.subsys_no = len(self.subsystem_info_list)
1042
1043        with open("kernel.conf", "w") as fh:
1044            fh.write(json.dumps(nvmet_cfg, indent=2))
1045
1046    def tgt_start(self):
1047        self.log.info("Configuring kernel NVMeOF Target")
1048
1049        if self.null_block:
1050            self.log.info("Configuring with null block device.")
1051            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1052        else:
1053            self.log.info("Configuring with NVMe drives.")
1054            nvme_list = self.get_nvme_devices()
1055
1056        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1057        self.subsys_no = len(nvme_list)
1058
1059        self.nvmet_command(self.nvmet_bin, "clear")
1060        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1061
1062        if self.enable_adq:
1063            self.adq_configure_tc()
1064
1065        self.log.info("Done configuring kernel NVMeOF Target")
1066
1067
1068class SPDKTarget(Target):
1069    def __init__(self, name, general_config, target_config):
1070        # Required fields
1071        self.core_mask = target_config["core_mask"]
1072        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1073
1074        super().__init__(name, general_config, target_config)
1075
1076        # Defaults
1077        self.dif_insert_strip = False
1078        self.null_block_dif_type = 0
1079        self.num_shared_buffers = 4096
1080        self.max_queue_depth = 128
1081        self.bpf_proc = None
1082        self.bpf_scripts = []
1083        self.enable_dsa = False
1084        self.scheduler_core_limit = None
1085        self.iobuf_small_pool_count = 16383
1086        self.iobuf_large_pool_count = 2047
1087        self.num_cqe = 4096
1088
1089        if "num_shared_buffers" in target_config:
1090            self.num_shared_buffers = target_config["num_shared_buffers"]
1091        if "max_queue_depth" in target_config:
1092            self.max_queue_depth = target_config["max_queue_depth"]
1093        if "null_block_dif_type" in target_config:
1094            self.null_block_dif_type = target_config["null_block_dif_type"]
1095        if "dif_insert_strip" in target_config:
1096            self.dif_insert_strip = target_config["dif_insert_strip"]
1097        if "bpf_scripts" in target_config:
1098            self.bpf_scripts = target_config["bpf_scripts"]
1099        if "dsa_settings" in target_config:
1100            self.enable_dsa = target_config["dsa_settings"]
1101        if "scheduler_core_limit" in target_config:
1102            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1103        if "iobuf_small_pool_count" in target_config:
1104            self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"]
1105        if "iobuf_large_pool_count" in target_config:
1106            self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"]
1107        if "num_cqe" in target_config:
1108            self.num_cqe = target_config["num_cqe"]
1109
1110        self.log.info("====DSA settings:====")
1111        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1112
1113    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1114        if mode not in ["default", "bynode", "cpulist",
1115                        "shared", "split", "split-bynode"]:
1116            self.log.error("%s irq affinity setting not supported" % mode)
1117            raise Exception
1118
1119        # Create core list from SPDK's mask and change it to string.
1120        # This is the type configure_irq_affinity expects for cpulist parameter.
1121        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1122        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1123        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1124
1125        if mode == "shared":
1126            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1127        elif mode == "split":
1128            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1129        elif mode == "split-bynode":
1130            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1131        else:
1132            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1133
1134    def adq_set_busy_read(self, busy_read_val):
1135        return {"net.core.busy_read": busy_read_val}
1136
1137    def get_nvme_devices_count(self):
1138        return len(self.get_nvme_devices())
1139
1140    def get_nvme_devices(self):
1141        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1142        bdev_bdfs = []
1143        for bdev in bdev_subsys_json_obj["config"]:
1144            bdev_traddr = bdev["params"]["traddr"]
1145            if bdev_traddr in self.nvme_blocklist:
1146                continue
1147            if len(self.nvme_allowlist) == 0:
1148                bdev_bdfs.append(bdev_traddr)
1149            if bdev_traddr in self.nvme_allowlist:
1150                bdev_bdfs.append(bdev_traddr)
1151        return bdev_bdfs
1152
1153    def spdk_tgt_configure(self):
1154        self.log.info("Configuring SPDK NVMeOF target via RPC")
1155
1156        # Create transport layer
1157        nvmf_transport_params = {
1158            "client": self.client,
1159            "trtype": self.transport,
1160            "num_shared_buffers": self.num_shared_buffers,
1161            "max_queue_depth": self.max_queue_depth,
1162            "dif_insert_or_strip": self.dif_insert_strip,
1163            "sock_priority": self.adq_priority,
1164            "num_cqe": self.num_cqe
1165        }
1166
1167        if self.enable_adq:
1168            nvmf_transport_params["acceptor_poll_rate"] = 10000
1169
1170        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1171        self.log.info("SPDK NVMeOF transport layer:")
1172        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1173
1174        if self.null_block:
1175            self.spdk_tgt_add_nullblock(self.null_block)
1176            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1177        else:
1178            self.spdk_tgt_add_nvme_conf()
1179            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1180
1181        if self.enable_adq:
1182            self.adq_configure_tc()
1183
1184        self.log.info("Done configuring SPDK NVMeOF Target")
1185
1186    def spdk_tgt_add_nullblock(self, null_block_count):
1187        md_size = 0
1188        block_size = 4096
1189        if self.null_block_dif_type != 0:
1190            md_size = 128
1191
1192        self.log.info("Adding null block bdevices to config via RPC")
1193        for i in range(null_block_count):
1194            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1195            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1196                                      dif_type=self.null_block_dif_type, md_size=md_size)
1197        self.log.info("SPDK Bdevs configuration:")
1198        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1199
1200    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1201        self.log.info("Adding NVMe bdevs to config via RPC")
1202
1203        bdfs = self.get_nvme_devices()
1204        bdfs = [b.replace(":", ".") for b in bdfs]
1205
1206        if req_num_disks:
1207            if req_num_disks > len(bdfs):
1208                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1209                sys.exit(1)
1210            else:
1211                bdfs = bdfs[0:req_num_disks]
1212
1213        for i, bdf in enumerate(bdfs):
1214            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1215
1216        self.log.info("SPDK Bdevs configuration:")
1217        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1218
1219    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1220        self.log.info("Adding subsystems to config")
1221        if not req_num_disks:
1222            req_num_disks = self.get_nvme_devices_count()
1223
1224        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1225            port = str(4420 + bdev_num)
1226            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1227            serial = "SPDK00%s" % bdev_num
1228            bdev_name = "Nvme%sn1" % bdev_num
1229
1230            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1231                                           allow_any_host=True, max_namespaces=8)
1232            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1233            for nqn_name in [nqn, "discovery"]:
1234                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1235                                                     nqn=nqn_name,
1236                                                     trtype=self.transport,
1237                                                     traddr=ip,
1238                                                     trsvcid=port,
1239                                                     adrfam="ipv4")
1240            self.subsystem_info_list.append((port, nqn, ip))
1241        self.subsys_no = len(self.subsystem_info_list)
1242
1243        self.log.info("SPDK NVMeOF subsystem configuration:")
1244        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1245
1246    def bpf_start(self):
1247        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1248        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1249        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1250        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1251
1252        with open(self.pid, "r") as fh:
1253            nvmf_pid = str(fh.readline())
1254
1255        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1256        self.log.info(cmd)
1257        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1258
1259    def tgt_start(self):
1260        if self.null_block:
1261            self.subsys_no = 1
1262        else:
1263            self.subsys_no = self.get_nvme_devices_count()
1264        self.log.info("Starting SPDK NVMeOF Target process")
1265        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1266        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1267        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1268
1269        with open(self.pid, "w") as fh:
1270            fh.write(str(proc.pid))
1271        self.nvmf_proc = proc
1272        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1273        self.log.info("Waiting for spdk to initialize...")
1274        while True:
1275            if os.path.exists("/var/tmp/spdk.sock"):
1276                break
1277            time.sleep(1)
1278        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1279
1280        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1281        rpc.iobuf.iobuf_set_options(self.client,
1282                                    small_pool_count=self.iobuf_small_pool_count,
1283                                    large_pool_count=self.iobuf_large_pool_count,
1284                                    small_bufsize=None,
1285                                    large_bufsize=None)
1286
1287        if self.enable_zcopy:
1288            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1289                                           enable_zerocopy_send_server=True)
1290            self.log.info("Target socket options:")
1291            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1292
1293        if self.enable_adq:
1294            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1295            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1296                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1297
1298        if self.enable_dsa:
1299            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1300            self.log.info("Target DSA accel module enabled")
1301
1302        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1303        rpc.framework_start_init(self.client)
1304
1305        if self.bpf_scripts:
1306            self.bpf_start()
1307
1308        self.spdk_tgt_configure()
1309
1310    def stop(self):
1311        if self.bpf_proc:
1312            self.log.info("Stopping BPF Trace script")
1313            self.bpf_proc.terminate()
1314            self.bpf_proc.wait()
1315
1316        if hasattr(self, "nvmf_proc"):
1317            try:
1318                self.nvmf_proc.terminate()
1319                self.nvmf_proc.wait(timeout=30)
1320            except Exception as e:
1321                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1322                self.log.info(e)
1323                self.nvmf_proc.kill()
1324                self.nvmf_proc.communicate()
1325                # Try to clean up RPC socket files if they were not removed
1326                # because of using 'kill'
1327                try:
1328                    os.remove("/var/tmp/spdk.sock")
1329                    os.remove("/var/tmp/spdk.sock.lock")
1330                except FileNotFoundError:
1331                    pass
1332        self.restore_settings()
1333
1334
1335class KernelInitiator(Initiator):
1336    def __init__(self, name, general_config, initiator_config):
1337        super().__init__(name, general_config, initiator_config)
1338
1339        # Defaults
1340        self.extra_params = ""
1341        self.ioengine = "libaio"
1342        self.spdk_conf = ""
1343
1344        if "num_cores" in initiator_config:
1345            self.num_cores = initiator_config["num_cores"]
1346
1347        if "extra_params" in initiator_config:
1348            self.extra_params = initiator_config["extra_params"]
1349
1350        if "kernel_engine" in initiator_config:
1351            self.ioengine = initiator_config["kernel_engine"]
1352            if "io_uring" in self.ioengine:
1353                self.extra_params += ' --nr-poll-queues=8'
1354
1355    def configure_adq(self):
1356        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1357
1358    def adq_configure_tc(self):
1359        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1360
1361    def adq_set_busy_read(self, busy_read_val):
1362        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1363        return {"net.core.busy_read": 0}
1364
1365    def get_connected_nvme_list(self):
1366        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1367        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1368                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1369        return nvme_list
1370
1371    def init_connect(self):
1372        self.log.info("Below connection attempts may result in error messages, this is expected!")
1373        for subsystem in self.subsystem_info_list:
1374            self.log.info("Trying to connect %s %s %s" % subsystem)
1375            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1376                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1377            time.sleep(2)
1378
1379        if "io_uring" in self.ioengine:
1380            self.log.info("Setting block layer settings for io_uring.")
1381
1382            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1383            #       apparently it's not possible for connected subsystems.
1384            #       Results in "error: Invalid argument"
1385            block_sysfs_settings = {
1386                "iostats": "0",
1387                "rq_affinity": "0",
1388                "nomerges": "2"
1389            }
1390
1391            for disk in self.get_connected_nvme_list():
1392                sysfs = os.path.join("/sys/block", disk, "queue")
1393                for k, v in block_sysfs_settings.items():
1394                    sysfs_opt_path = os.path.join(sysfs, k)
1395                    try:
1396                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1397                    except CalledProcessError as e:
1398                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1399                    finally:
1400                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1401                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1402
1403    def init_disconnect(self):
1404        for subsystem in self.subsystem_info_list:
1405            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1406            time.sleep(1)
1407
1408    def get_nvme_subsystem_numa(self, dev_name):
1409        # Remove two last characters to get controller name instead of subsystem name
1410        nvme_ctrl = os.path.basename(dev_name)[:-2]
1411        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1412                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1413        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1414
1415    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1416        self.available_cpus = self.get_numa_cpu_map()
1417        if len(threads) >= len(subsystems):
1418            threads = range(0, len(subsystems))
1419
1420        # Generate connected nvme devices names and sort them by used NIC numa node
1421        # to allow better grouping when splitting into fio sections.
1422        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1423        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1424        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1425
1426        filename_section = ""
1427        nvme_per_split = int(len(nvme_list) / len(threads))
1428        remainder = len(nvme_list) % len(threads)
1429        iterator = iter(nvme_list)
1430        result = []
1431        for i in range(len(threads)):
1432            result.append([])
1433            for _ in range(nvme_per_split):
1434                result[i].append(next(iterator))
1435                if remainder:
1436                    result[i].append(next(iterator))
1437                    remainder -= 1
1438        for i, r in enumerate(result):
1439            header = "[filename%s]" % i
1440            disks = "\n".join(["filename=%s" % x for x in r])
1441            job_section_qd = round((io_depth * len(r)) / num_jobs)
1442            if job_section_qd == 0:
1443                job_section_qd = 1
1444            iodepth = "iodepth=%s" % job_section_qd
1445
1446            offset_section = ""
1447            if offset:
1448                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1449
1450            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1451
1452            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1453
1454        return filename_section
1455
1456
1457class SPDKInitiator(Initiator):
1458    def __init__(self, name, general_config, initiator_config):
1459        super().__init__(name, general_config, initiator_config)
1460
1461        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1462            self.install_spdk()
1463
1464        # Optional fields
1465        self.enable_data_digest = False
1466        if "enable_data_digest" in initiator_config:
1467            self.enable_data_digest = initiator_config["enable_data_digest"]
1468        if "num_cores" in initiator_config:
1469            self.num_cores = initiator_config["num_cores"]
1470
1471        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1472        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1473
1474    def adq_set_busy_read(self, busy_read_val):
1475        return {"net.core.busy_read": busy_read_val}
1476
1477    def install_spdk(self):
1478        self.log.info("Using fio binary %s" % self.fio_bin)
1479        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1480        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1481        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1482                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1483                       "--enable-lto", "--disable-unit-tests"])
1484        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1485        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1486
1487        self.log.info("SPDK built")
1488        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1489
1490    def init_connect(self):
1491        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1492        # bdev plugin initiates connection just before starting IO traffic.
1493        # This is just to have a "init_connect" equivalent of the same function
1494        # from KernelInitiator class.
1495        # Just prepare bdev.conf JSON file for later use and consider it
1496        # "making a connection".
1497        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1498        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1499
1500    def init_disconnect(self):
1501        # SPDK Initiator does not need to explicity disconnect as this gets done
1502        # after fio bdev plugin finishes IO.
1503        pass
1504
1505    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1506        bdev_cfg_section = {
1507            "subsystems": [
1508                {
1509                    "subsystem": "bdev",
1510                    "config": []
1511                }
1512            ]
1513        }
1514
1515        for i, subsys in enumerate(remote_subsystem_list):
1516            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1517            nvme_ctrl = {
1518                "method": "bdev_nvme_attach_controller",
1519                "params": {
1520                    "name": "Nvme{}".format(i),
1521                    "trtype": self.transport,
1522                    "traddr": sub_addr,
1523                    "trsvcid": sub_port,
1524                    "subnqn": sub_nqn,
1525                    "adrfam": "IPv4"
1526                }
1527            }
1528
1529            if self.enable_adq:
1530                nvme_ctrl["params"].update({"priority": "1"})
1531
1532            if self.enable_data_digest:
1533                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1534
1535            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1536
1537        return json.dumps(bdev_cfg_section, indent=2)
1538
1539    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1540        self.available_cpus = self.get_numa_cpu_map()
1541        filename_section = ""
1542        if len(threads) >= len(subsystems):
1543            threads = range(0, len(subsystems))
1544
1545        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1546        # to allow better grouping when splitting into fio sections.
1547        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1548        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1549        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1550
1551        nvme_per_split = int(len(subsystems) / len(threads))
1552        remainder = len(subsystems) % len(threads)
1553        iterator = iter(filenames)
1554        result = []
1555        for i in range(len(threads)):
1556            result.append([])
1557            for _ in range(nvme_per_split):
1558                result[i].append(next(iterator))
1559            if remainder:
1560                result[i].append(next(iterator))
1561                remainder -= 1
1562        for i, r in enumerate(result):
1563            header = "[filename%s]" % i
1564            disks = "\n".join(["filename=%s" % x for x in r])
1565            job_section_qd = round((io_depth * len(r)) / num_jobs)
1566            if job_section_qd == 0:
1567                job_section_qd = 1
1568            iodepth = "iodepth=%s" % job_section_qd
1569
1570            offset_section = ""
1571            if offset:
1572                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1573
1574            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1575
1576            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1577
1578        return filename_section
1579
1580    def get_nvme_subsystem_numa(self, bdev_name):
1581        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1582        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1583
1584        # Remove two last characters to get controller name instead of subsystem name
1585        nvme_ctrl = bdev_name[:-2]
1586        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1587        return self.get_route_nic_numa(remote_nvme_ip)
1588
1589
1590if __name__ == "__main__":
1591    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1592    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1593
1594    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1595    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1596                        help='Configuration file.')
1597    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1598                        help='Results directory.')
1599    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1600                        help='CSV results filename.')
1601    parser.add_argument('-f', '--force', default=False, action='store_true',
1602                        dest='force', help="""Force script to continue and try to use all
1603                        available NVMe devices during test.
1604                        WARNING: Might result in data loss on used NVMe drives""")
1605
1606    args = parser.parse_args()
1607
1608    logging.basicConfig(level=logging.INFO,
1609                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1610
1611    logging.info("Using config file: %s" % args.config)
1612    with open(args.config, "r") as config:
1613        data = json.load(config)
1614
1615    initiators = []
1616    fio_cases = []
1617
1618    general_config = data["general"]
1619    target_config = data["target"]
1620    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1621
1622    if "null_block_devices" not in data["target"] and \
1623        (args.force is False and
1624            "allowlist" not in data["target"] and
1625            "blocklist" not in data["target"]):
1626        # TODO: Also check if allowlist or blocklist are not empty.
1627        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1628        You can choose to use all available NVMe drives on your system, which may potentially
1629        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1630        exit(1)
1631
1632    for k, v in data.items():
1633        if "target" in k:
1634            v.update({"results_dir": args.results})
1635            if data[k]["mode"] == "spdk":
1636                target_obj = SPDKTarget(k, data["general"], v)
1637            elif data[k]["mode"] == "kernel":
1638                target_obj = KernelTarget(k, data["general"], v)
1639        elif "initiator" in k:
1640            if data[k]["mode"] == "spdk":
1641                init_obj = SPDKInitiator(k, data["general"], v)
1642            elif data[k]["mode"] == "kernel":
1643                init_obj = KernelInitiator(k, data["general"], v)
1644            initiators.append(init_obj)
1645        elif "fio" in k:
1646            fio_workloads = itertools.product(data[k]["bs"],
1647                                              data[k]["qd"],
1648                                              data[k]["rw"])
1649
1650            fio_run_time = data[k]["run_time"]
1651            fio_ramp_time = data[k]["ramp_time"]
1652            fio_rw_mix_read = data[k]["rwmixread"]
1653            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1654            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1655
1656            fio_rate_iops = 0
1657            if "rate_iops" in data[k]:
1658                fio_rate_iops = data[k]["rate_iops"]
1659
1660            fio_offset = False
1661            if "offset" in data[k]:
1662                fio_offset = data[k]["offset"]
1663            fio_offset_inc = 0
1664            if "offset_inc" in data[k]:
1665                fio_offset_inc = data[k]["offset_inc"]
1666        else:
1667            continue
1668
1669    try:
1670        os.mkdir(args.results)
1671    except FileExistsError:
1672        pass
1673
1674    for i in initiators:
1675        target_obj.initiator_info.append(
1676            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1677        )
1678
1679    # TODO: This try block is definietly too large. Need to break this up into separate
1680    # logical blocks to reduce size.
1681    try:
1682        target_obj.tgt_start()
1683
1684        for i in initiators:
1685            i.match_subsystems(target_obj.subsystem_info_list)
1686            if i.enable_adq:
1687                i.adq_configure_tc()
1688
1689        # Poor mans threading
1690        # Run FIO tests
1691        for block_size, io_depth, rw in fio_workloads:
1692            configs = []
1693            for i in initiators:
1694                i.init_connect()
1695                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1696                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1697                                       fio_offset, fio_offset_inc)
1698                configs.append(cfg)
1699
1700            for run_no in range(1, fio_run_num+1):
1701                threads = []
1702                power_daemon = None
1703                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1704
1705                for i, cfg in zip(initiators, configs):
1706                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1707                    threads.append(t)
1708                if target_obj.enable_sar:
1709                    sar_file_prefix = measurements_prefix + "_sar"
1710                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1711                    threads.append(t)
1712
1713                if target_obj.enable_pcm:
1714                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1715
1716                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1717                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1718                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1719                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1720                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1721                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1722
1723                    threads.append(pcm_cpu_t)
1724                    threads.append(pcm_mem_t)
1725                    threads.append(pcm_pow_t)
1726
1727                if target_obj.enable_bw:
1728                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1729                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1730                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1731                    threads.append(t)
1732
1733                if target_obj.enable_dpdk_memory:
1734                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1735                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1736                    threads.append(t)
1737
1738                if target_obj.enable_pm:
1739                    power_daemon = threading.Thread(target=target_obj.measure_power,
1740                                                    args=(args.results, measurements_prefix, script_full_dir,
1741                                                          fio_ramp_time, fio_run_time))
1742                    threads.append(power_daemon)
1743
1744                for t in threads:
1745                    t.start()
1746                for t in threads:
1747                    t.join()
1748
1749            for i in initiators:
1750                i.init_disconnect()
1751                i.copy_result_files(args.results)
1752        try:
1753            parse_results(args.results, args.csv_filename)
1754        except Exception:
1755            logging.error("There was an error with parsing the results")
1756    finally:
1757        for i in initiators:
1758            try:
1759                i.stop()
1760            except Exception as err:
1761                pass
1762        target_obj.stop()
1763