xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision a1dfa7ec92a6c49538482c8bb73f0b1ce040441f)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.nic_ips = server_config["nic_ips"]
39        self.mode = server_config["mode"]
40
41        self.log = logging.getLogger(self.name)
42
43        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
44        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
45            self.irq_scripts_dir = server_config["irq_scripts_dir"]
46
47        self.local_nic_info = []
48        self._nics_json_obj = {}
49        self.svc_restore_dict = {}
50        self.sysctl_restore_dict = {}
51        self.tuned_restore_dict = {}
52        self.governor_restore = ""
53        self.tuned_profile = ""
54
55        self.enable_adq = False
56        self.adq_priority = None
57        self.irq_settings = {"mode": "default"}
58
59        if "adq_enable" in server_config and server_config["adq_enable"]:
60            self.enable_adq = server_config["adq_enable"]
61            self.adq_priority = 1
62        if "irq_settings" in server_config:
63            self.irq_settings.update(server_config["irq_settings"])
64        if "tuned_profile" in server_config:
65            self.tuned_profile = server_config["tuned_profile"]
66
67        if not re.match(r'^[A-Za-z0-9\-]+$', name):
68            self.log.info("Please use a name which contains only letters, numbers or dashes")
69            sys.exit(1)
70
71    @staticmethod
72    def get_uncommented_lines(lines):
73        return [line for line in lines if line and not line.startswith('#')]
74
75    def get_nic_name_by_ip(self, ip):
76        if not self._nics_json_obj:
77            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
78            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
79        for nic in self._nics_json_obj:
80            for addr in nic["addr_info"]:
81                if ip in addr["local"]:
82                    return nic["ifname"]
83
84    def set_local_nic_info_helper(self):
85        pass
86
87    def set_local_nic_info(self, pci_info):
88        def extract_network_elements(json_obj):
89            nic_list = []
90            if isinstance(json_obj, list):
91                for x in json_obj:
92                    nic_list.extend(extract_network_elements(x))
93            elif isinstance(json_obj, dict):
94                if "children" in json_obj:
95                    nic_list.extend(extract_network_elements(json_obj["children"]))
96                if "class" in json_obj.keys() and "network" in json_obj["class"]:
97                    nic_list.append(json_obj)
98            return nic_list
99
100        self.local_nic_info = extract_network_elements(pci_info)
101
102    def get_nic_numa_node(self, nic_name):
103        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
104
105    def get_numa_cpu_map(self):
106        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
107        numa_cpu_json_map = {}
108
109        for cpu in numa_cpu_json_obj["cpus"]:
110            cpu_num = int(cpu["cpu"])
111            numa_node = int(cpu["node"])
112            numa_cpu_json_map.setdefault(numa_node, [])
113            numa_cpu_json_map[numa_node].append(cpu_num)
114
115        return numa_cpu_json_map
116
117    # pylint: disable=R0201
118    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
119        return ""
120
121    def configure_system(self):
122        self.load_drivers()
123        self.configure_services()
124        self.configure_sysctl()
125        self.configure_tuned()
126        self.configure_cpu_governor()
127        self.configure_irq_affinity(**self.irq_settings)
128
129    def load_drivers(self):
130        self.log.info("Loading drivers")
131        self.exec_cmd(["sudo", "modprobe", "-a",
132                       "nvme-%s" % self.transport,
133                       "nvmet-%s" % self.transport])
134
135    def configure_adq(self):
136        self.adq_load_modules()
137        self.adq_configure_nic()
138
139    def adq_load_modules(self):
140        self.log.info("Modprobing ADQ-related Linux modules...")
141        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
142        for module in adq_module_deps:
143            try:
144                self.exec_cmd(["sudo", "modprobe", module])
145                self.log.info("%s loaded!" % module)
146            except CalledProcessError as e:
147                self.log.error("ERROR: failed to load module %s" % module)
148                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
149
150    def adq_configure_tc(self):
151        self.log.info("Configuring ADQ Traffic classes and filters...")
152
153        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
154        num_queues_tc1 = self.num_cores
155        port_param = "dst_port" if isinstance(self, Target) else "src_port"
156        port = "4420"
157        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
158
159        for nic_ip in self.nic_ips:
160            nic_name = self.get_nic_name_by_ip(nic_ip)
161            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
162                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
163                                "queues", "%s@0" % num_queues_tc0,
164                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
165                                "hw", "1", "mode", "channel"]
166            self.log.info(" ".join(tc_qdisc_map_cmd))
167            self.exec_cmd(tc_qdisc_map_cmd)
168
169            time.sleep(5)
170            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
171            self.log.info(" ".join(tc_qdisc_ingress_cmd))
172            self.exec_cmd(tc_qdisc_ingress_cmd)
173
174            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
175                             "protocol", "ip", "ingress", "prio", "1", "flower",
176                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
177                             "skip_sw", "hw_tc", "1"]
178            self.log.info(" ".join(tc_filter_cmd))
179            self.exec_cmd(tc_filter_cmd)
180
181            # show tc configuration
182            self.log.info("Show tc configuration for %s NIC..." % nic_name)
183            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
184            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
185            self.log.info("%s" % tc_disk_out)
186            self.log.info("%s" % tc_filter_out)
187
188            # Ethtool coalesce settings must be applied after configuring traffic classes
189            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
190            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
191
192            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
193            xps_cmd = ["sudo", xps_script_path, nic_name]
194            self.log.info(xps_cmd)
195            self.exec_cmd(xps_cmd)
196
197    def reload_driver(self, driver):
198        try:
199            self.exec_cmd(["sudo", "rmmod", driver])
200            self.exec_cmd(["sudo", "modprobe", driver])
201        except CalledProcessError as e:
202            self.log.error("ERROR: failed to reload %s module!" % driver)
203            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
204
205    def adq_configure_nic(self):
206        self.log.info("Configuring NIC port settings for ADQ testing...")
207
208        # Reload the driver first, to make sure any previous settings are re-set.
209        self.reload_driver("ice")
210
211        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
212        for nic in nic_names:
213            self.log.info(nic)
214            try:
215                self.exec_cmd(["sudo", "ethtool", "-K", nic,
216                               "hw-tc-offload", "on"])  # Enable hardware TC offload
217                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
218                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
219                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
220                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
221                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
222                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
223                               "channel-pkt-inspect-optimize", "on"])
224            except CalledProcessError as e:
225                self.log.error("ERROR: failed to configure NIC port using ethtool!")
226                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
227                self.log.info("Please update your NIC driver and firmware versions and try again.")
228            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
229            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
230
231    def configure_services(self):
232        self.log.info("Configuring active services...")
233        svc_config = configparser.ConfigParser(strict=False)
234
235        # Below list is valid only for RHEL / Fedora systems and might not
236        # contain valid names for other distributions.
237        svc_target_state = {
238            "firewalld": "inactive",
239            "irqbalance": "inactive",
240            "lldpad.service": "inactive",
241            "lldpad.socket": "inactive"
242        }
243
244        for service in svc_target_state:
245            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
246            out = "\n".join(["[%s]" % service, out])
247            svc_config.read_string(out)
248
249            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
250                continue
251
252            service_state = svc_config[service]["ActiveState"]
253            self.log.info("Current state of %s service is %s" % (service, service_state))
254            self.svc_restore_dict.update({service: service_state})
255            if service_state != "inactive":
256                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
257                self.exec_cmd(["sudo", "systemctl", "stop", service])
258
259    def configure_sysctl(self):
260        self.log.info("Tuning sysctl settings...")
261
262        sysctl_opts = {
263            "net.core.busy_poll": 0,
264            "net.core.busy_read": 0,
265            "net.core.somaxconn": 4096,
266            "net.core.netdev_max_backlog": 8192,
267            "net.ipv4.tcp_max_syn_backlog": 16384,
268            "net.core.rmem_max": 268435456,
269            "net.core.wmem_max": 268435456,
270            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
271            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
272            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
273            "net.ipv4.route.flush": 1,
274            "vm.overcommit_memory": 1,
275        }
276
277        if self.enable_adq:
278            sysctl_opts.update(self.adq_set_busy_read(1))
279
280        for opt, value in sysctl_opts.items():
281            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
282            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
283
284    def configure_tuned(self):
285        if not self.tuned_profile:
286            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
287            return
288
289        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
290        service = "tuned"
291        tuned_config = configparser.ConfigParser(strict=False)
292
293        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
294        out = "\n".join(["[%s]" % service, out])
295        tuned_config.read_string(out)
296        tuned_state = tuned_config[service]["ActiveState"]
297        self.svc_restore_dict.update({service: tuned_state})
298
299        if tuned_state != "inactive":
300            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
301            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
302
303            self.tuned_restore_dict = {
304                "profile": profile,
305                "mode": profile_mode
306            }
307
308        self.exec_cmd(["sudo", "systemctl", "start", service])
309        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
310        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
311
312    def configure_cpu_governor(self):
313        self.log.info("Setting CPU governor to performance...")
314
315        # This assumes that there is the same CPU scaling governor on each CPU
316        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
317        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
318
319    def get_core_list_from_mask(self, core_mask):
320        # Generate list of individual cores from hex core mask
321        # (e.g. '0xffff') or list containing:
322        # - individual cores (e.g. '1, 2, 3')
323        # - core ranges (e.g. '0-3')
324        # - mix of both (e.g. '0, 1-4, 9, 11-13')
325
326        core_list = []
327        if "0x" in core_mask:
328            core_mask_int = int(core_mask, 16)
329            for i in range(core_mask_int.bit_length()):
330                if (1 << i) & core_mask_int:
331                    core_list.append(i)
332            return core_list
333        else:
334            # Core list can be provided in .json config with square brackets
335            # remove them first
336            core_mask = core_mask.replace("[", "")
337            core_mask = core_mask.replace("]", "")
338
339            for i in core_mask.split(","):
340                if "-" in i:
341                    start, end = i.split("-")
342                    core_range = range(int(start), int(end) + 1)
343                    core_list.extend(core_range)
344                else:
345                    core_list.append(int(i))
346            return core_list
347
348    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
349        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
350
351        if mode not in ["default", "bynode", "cpulist"]:
352            raise ValueError("%s irq affinity setting not supported" % mode)
353
354        if mode == "cpulist" and not cpulist:
355            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
356
357        affinity_script = "set_irq_affinity.sh"
358        if "default" not in mode:
359            affinity_script = "set_irq_affinity_cpulist.sh"
360            system_cpu_map = self.get_numa_cpu_map()
361        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
362
363        def cpu_list_to_string(cpulist):
364            return ",".join(map(lambda x: str(x), cpulist))
365
366        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
367        for nic_name in nic_names:
368            irq_cmd = ["sudo", irq_script_path]
369
370            # Use only CPU cores matching NIC NUMA node.
371            # Remove any CPU cores if they're on exclusion list.
372            if mode == "bynode":
373                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
374                if cpulist and exclude_cpulist:
375                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
376                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
377                    if not irq_cpus:
378                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
379                irq_cmd.append(cpu_list_to_string(irq_cpus))
380
381            if mode == "cpulist":
382                irq_cpus = self.get_core_list_from_mask(cpulist)
383                if exclude_cpulist:
384                    # Flatten system CPU list, we don't need NUMA awareness here
385                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
386                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
387                    if not irq_cpus:
388                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
389                irq_cmd.append(cpu_list_to_string(irq_cpus))
390
391            irq_cmd.append(nic_name)
392            self.log.info(irq_cmd)
393            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
394
395    def restore_services(self):
396        self.log.info("Restoring services...")
397        for service, state in self.svc_restore_dict.items():
398            cmd = "stop" if state == "inactive" else "start"
399            self.exec_cmd(["sudo", "systemctl", cmd, service])
400
401    def restore_sysctl(self):
402        self.log.info("Restoring sysctl settings...")
403        for opt, value in self.sysctl_restore_dict.items():
404            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
405
406    def restore_tuned(self):
407        self.log.info("Restoring tuned-adm settings...")
408
409        if not self.tuned_restore_dict:
410            return
411
412        if self.tuned_restore_dict["mode"] == "auto":
413            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
414            self.log.info("Reverted tuned-adm to auto_profile.")
415        else:
416            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
417            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
418
419    def restore_governor(self):
420        self.log.info("Restoring CPU governor setting...")
421        if self.governor_restore:
422            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
423            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
424
425    def restore_settings(self):
426        self.restore_governor()
427        self.restore_tuned()
428        self.restore_services()
429        self.restore_sysctl()
430        if self.enable_adq:
431            self.reload_driver("ice")
432
433
434class Target(Server):
435    def __init__(self, name, general_config, target_config):
436        super().__init__(name, general_config, target_config)
437
438        # Defaults
439        self.enable_zcopy = False
440        self.scheduler_name = "static"
441        self.null_block = 0
442        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
443        self.subsystem_info_list = []
444        self.initiator_info = []
445        self.nvme_allowlist = []
446        self.nvme_blocklist = []
447
448        # Target-side measurement options
449        self.enable_pm = True
450        self.enable_sar = True
451        self.enable_pcm = True
452        self.enable_bw = True
453        self.enable_dpdk_memory = True
454
455        if "null_block_devices" in target_config:
456            self.null_block = target_config["null_block_devices"]
457        if "scheduler_settings" in target_config:
458            self.scheduler_name = target_config["scheduler_settings"]
459        if "zcopy_settings" in target_config:
460            self.enable_zcopy = target_config["zcopy_settings"]
461        if "results_dir" in target_config:
462            self.results_dir = target_config["results_dir"]
463        if "blocklist" in target_config:
464            self.nvme_blocklist = target_config["blocklist"]
465        if "allowlist" in target_config:
466            self.nvme_allowlist = target_config["allowlist"]
467            # Blocklist takes precedence, remove common elements from allowlist
468            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
469        if "enable_pm" in target_config:
470            self.enable_pm = target_config["enable_pm"]
471        if "enable_sar" in target_config:
472            self.enable_sar = target_config["enable_sar"]
473        if "enable_pcm" in target_config:
474            self.enable_pcm = target_config["enable_pcm"]
475        if "enable_bandwidth" in target_config:
476            self.enable_bw = target_config["enable_bandwidth"]
477        if "enable_dpdk_memory" in target_config:
478            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
479
480        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
481        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
482
483        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
484        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
485        self.set_local_nic_info(self.set_local_nic_info_helper())
486
487        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
488            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
489
490        self.configure_system()
491        if self.enable_adq:
492            self.configure_adq()
493        self.sys_config()
494
495    def set_local_nic_info_helper(self):
496        return json.loads(self.exec_cmd(["lshw", "-json"]))
497
498    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
499        stderr_opt = None
500        if stderr_redirect:
501            stderr_opt = subprocess.STDOUT
502        if change_dir:
503            old_cwd = os.getcwd()
504            os.chdir(change_dir)
505            self.log.info("Changing directory to %s" % change_dir)
506
507        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
508
509        if change_dir:
510            os.chdir(old_cwd)
511            self.log.info("Changing directory to %s" % old_cwd)
512        return out
513
514    def zip_spdk_sources(self, spdk_dir, dest_file):
515        self.log.info("Zipping SPDK source directory")
516        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
517        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
518            for file in files:
519                fh.write(os.path.relpath(os.path.join(root, file)))
520        fh.close()
521        self.log.info("Done zipping")
522
523    @staticmethod
524    def _chunks(input_list, chunks_no):
525        div, rem = divmod(len(input_list), chunks_no)
526        for i in range(chunks_no):
527            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
528            yield input_list[si:si + (div + 1 if i < rem else div)]
529
530    def spread_bdevs(self, req_disks):
531        # Spread available block devices indexes:
532        # - evenly across available initiator systems
533        # - evenly across available NIC interfaces for
534        #   each initiator
535        # Not NUMA aware.
536        ip_bdev_map = []
537        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
538
539        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
540            self.initiator_info[i]["bdev_range"] = init_chunk
541            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
542            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
543                for c in nic_chunk:
544                    ip_bdev_map.append((ip, c))
545        return ip_bdev_map
546
547    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
548        cpu_number = os.cpu_count()
549        sar_idle_sum = 0
550        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
551        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
552
553        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
554        time.sleep(ramp_time)
555        self.log.info("Starting SAR measurements")
556
557        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
558        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
559            for line in out.split("\n"):
560                if "Average" in line:
561                    if "CPU" in line:
562                        self.log.info("Summary CPU utilization from SAR:")
563                        self.log.info(line)
564                    elif "all" in line:
565                        self.log.info(line)
566                    else:
567                        sar_idle_sum += float(line.split()[7])
568            fh.write(out)
569        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
570
571        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
572            f.write("%0.2f" % sar_cpu_usage)
573
574    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
575        time.sleep(ramp_time)
576        self.log.info("Starting power measurements")
577        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
578                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
579                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
580
581    def ethtool_after_fio_ramp(self, fio_ramp_time):
582        time.sleep(fio_ramp_time//2)
583        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
584        for nic in nic_names:
585            self.log.info(nic)
586            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
587                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
588
589    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
590        time.sleep(ramp_time)
591        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
592        pcm_memory = subprocess.Popen(cmd)
593        time.sleep(run_time)
594        pcm_memory.terminate()
595
596    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
597        time.sleep(ramp_time)
598        cmd = ["pcm", "1", "-i=%s" % run_time,
599               "-csv=%s/%s" % (results_dir, pcm_file_name)]
600        subprocess.run(cmd)
601        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
602        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
603        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
604        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
605        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
606
607    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
608        time.sleep(ramp_time)
609        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
610        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
611            fh.write(out)
612        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
613        #       Improve this so that additional file containing measurements average is generated too.
614
615    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
616        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
617        time.sleep(ramp_time)
618        self.log.info("INFO: starting network bandwidth measure")
619        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
620                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
621        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
622        #       Improve this so that additional file containing measurements average is generated too.
623        # TODO: Monitor only these interfaces which are currently used to run the workload.
624
625    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
626        self.log.info("INFO: waiting to generate DPDK memory usage")
627        time.sleep(ramp_time)
628        self.log.info("INFO: generating DPDK memory usage")
629        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
630        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
631
632    def sys_config(self):
633        self.log.info("====Kernel release:====")
634        self.log.info(os.uname().release)
635        self.log.info("====Kernel command line:====")
636        with open('/proc/cmdline') as f:
637            cmdline = f.readlines()
638            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
639        self.log.info("====sysctl conf:====")
640        with open('/etc/sysctl.conf') as f:
641            sysctl = f.readlines()
642            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
643        self.log.info("====Cpu power info:====")
644        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
645        self.log.info("====zcopy settings:====")
646        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
647        self.log.info("====Scheduler settings:====")
648        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
649
650
651class Initiator(Server):
652    def __init__(self, name, general_config, initiator_config):
653        super().__init__(name, general_config, initiator_config)
654
655        # Required fields
656        self.ip = initiator_config["ip"]
657        self.target_nic_ips = initiator_config["target_nic_ips"]
658
659        # Defaults
660        self.cpus_allowed = None
661        self.cpus_allowed_policy = "shared"
662        self.spdk_dir = "/tmp/spdk"
663        self.fio_bin = "/usr/src/fio/fio"
664        self.nvmecli_bin = "nvme"
665        self.cpu_frequency = None
666        self.subsystem_info_list = []
667        self.allow_cpu_sharing = True
668
669        if "spdk_dir" in initiator_config:
670            self.spdk_dir = initiator_config["spdk_dir"]
671        if "fio_bin" in initiator_config:
672            self.fio_bin = initiator_config["fio_bin"]
673        if "nvmecli_bin" in initiator_config:
674            self.nvmecli_bin = initiator_config["nvmecli_bin"]
675        if "cpus_allowed" in initiator_config:
676            self.cpus_allowed = initiator_config["cpus_allowed"]
677        if "cpus_allowed_policy" in initiator_config:
678            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
679        if "cpu_frequency" in initiator_config:
680            self.cpu_frequency = initiator_config["cpu_frequency"]
681        if "allow_cpu_sharing" in initiator_config:
682            self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"]
683        if os.getenv('SPDK_WORKSPACE'):
684            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
685
686        self.ssh_connection = paramiko.SSHClient()
687        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
688        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
689        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
690        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
691        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
692
693        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
694            self.copy_spdk("/tmp/spdk.zip")
695        self.set_local_nic_info(self.set_local_nic_info_helper())
696        self.set_cpu_frequency()
697        self.configure_system()
698        if self.enable_adq:
699            self.configure_adq()
700        self.sys_config()
701
702    def set_local_nic_info_helper(self):
703        return json.loads(self.exec_cmd(["lshw", "-json"]))
704
705    def stop(self):
706        self.restore_settings()
707        self.ssh_connection.close()
708
709    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
710        if change_dir:
711            cmd = ["cd", change_dir, ";", *cmd]
712
713        # In case one of the command elements contains whitespace and is not
714        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
715        # when sending to remote system.
716        for i, c in enumerate(cmd):
717            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
718                cmd[i] = '"%s"' % c
719        cmd = " ".join(cmd)
720
721        # Redirect stderr to stdout thanks using get_pty option if needed
722        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
723        out = stdout.read().decode(encoding="utf-8")
724
725        # Check the return code
726        rc = stdout.channel.recv_exit_status()
727        if rc:
728            raise CalledProcessError(int(rc), cmd, out)
729
730        return out
731
732    def put_file(self, local, remote_dest):
733        ftp = self.ssh_connection.open_sftp()
734        ftp.put(local, remote_dest)
735        ftp.close()
736
737    def get_file(self, remote, local_dest):
738        ftp = self.ssh_connection.open_sftp()
739        ftp.get(remote, local_dest)
740        ftp.close()
741
742    def copy_spdk(self, local_spdk_zip):
743        self.log.info("Copying SPDK sources to initiator %s" % self.name)
744        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
745        self.log.info("Copied sources zip from target")
746        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
747        self.log.info("Sources unpacked")
748
749    def copy_result_files(self, dest_dir):
750        self.log.info("Copying results")
751
752        if not os.path.exists(dest_dir):
753            os.mkdir(dest_dir)
754
755        # Get list of result files from initiator and copy them back to target
756        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
757
758        for file in file_list:
759            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
760                          os.path.join(dest_dir, file))
761        self.log.info("Done copying results")
762
763    def match_subsystems(self, target_subsytems):
764        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
765        subsystems.sort(key=lambda x: x[1])
766        self.log.info("Found matching subsystems on target side:")
767        for s in subsystems:
768            self.log.info(s)
769        self.subsystem_info_list = subsystems
770
771    def gen_fio_filename_conf(self, *args, **kwargs):
772        # Logic implemented in SPDKInitiator and KernelInitiator classes
773        pass
774
775    def get_route_nic_numa(self, remote_nvme_ip):
776        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
777        local_nvme_nic = local_nvme_nic[0]["dev"]
778        return self.get_nic_numa_node(local_nvme_nic)
779
780    @staticmethod
781    def gen_fio_offset_section(offset_inc, num_jobs):
782        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
783        return "\n".join(["size=%s%%" % offset_inc,
784                          "offset=0%",
785                          "offset_increment=%s%%" % offset_inc])
786
787    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
788        numa_stats = {}
789        allowed_cpus = []
790        for nvme in fio_filenames_list:
791            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
792            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
793
794        # Use the most common NUMA node for this chunk to allocate memory and CPUs
795        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
796
797        # Check if we have enough free CPUs to pop from the list before assigning them
798        if len(self.available_cpus[section_local_numa]) < num_jobs:
799            if self.allow_cpu_sharing:
800                self.log.info("Regenerating available CPU list %s" % section_local_numa)
801                # Remove still available CPUs from the regenerated list. We don't want to
802                # regenerate it with duplicates.
803                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
804                self.available_cpus[section_local_numa].extend(cpus_regen)
805                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
806            else:
807                self.log.error("No more free CPU cores to use from allowed_cpus list!")
808                raise IndexError
809
810        for _ in range(num_jobs):
811            try:
812                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
813            except IndexError:
814                self.log.error("No more free CPU cores to use from allowed_cpus list!")
815                raise
816
817        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
818                          "numa_mem_policy=prefer:%s" % section_local_numa])
819
820    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
821                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
822                       offset=False, offset_inc=0):
823        fio_conf_template = """
824[global]
825ioengine={ioengine}
826{spdk_conf}
827thread=1
828group_reporting=1
829direct=1
830percentile_list=50:90:99:99.5:99.9:99.99:99.999
831
832norandommap=1
833rw={rw}
834rwmixread={rwmixread}
835bs={block_size}
836time_based=1
837ramp_time={ramp_time}
838runtime={run_time}
839rate_iops={rate_iops}
840"""
841
842        if self.cpus_allowed is not None:
843            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
844            cpus_num = 0
845            cpus = self.cpus_allowed.split(",")
846            for cpu in cpus:
847                if "-" in cpu:
848                    a, b = cpu.split("-")
849                    a = int(a)
850                    b = int(b)
851                    cpus_num += len(range(a, b))
852                else:
853                    cpus_num += 1
854            self.num_cores = cpus_num
855            threads = range(0, self.num_cores)
856        elif hasattr(self, 'num_cores'):
857            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
858            threads = range(0, int(self.num_cores))
859        else:
860            self.num_cores = len(self.subsystem_info_list)
861            threads = range(0, len(self.subsystem_info_list))
862
863        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
864                                                      offset, offset_inc)
865
866        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
867                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
868                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
869
870        # TODO: hipri disabled for now, as it causes fio errors:
871        # io_u error on file /dev/nvme2n1: Operation not supported
872        # See comment in KernelInitiator class, init_connect() function
873        if "io_uring" in self.ioengine:
874            fio_config = fio_config + """
875fixedbufs=1
876registerfiles=1
877#hipri=1
878"""
879        if num_jobs:
880            fio_config = fio_config + "numjobs=%s \n" % num_jobs
881        if self.cpus_allowed is not None:
882            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
883            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
884        fio_config = fio_config + filename_section
885
886        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
887        if hasattr(self, "num_cores"):
888            fio_config_filename += "_%sCPU" % self.num_cores
889        fio_config_filename += ".fio"
890
891        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
892        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
893        self.log.info("Created FIO Config:")
894        self.log.info(fio_config)
895
896        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
897
898    def set_cpu_frequency(self):
899        if self.cpu_frequency is not None:
900            try:
901                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
902                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
903                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
904            except Exception:
905                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
906                sys.exit()
907        else:
908            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
909
910    def run_fio(self, fio_config_file, run_num=1):
911        job_name, _ = os.path.splitext(fio_config_file)
912        self.log.info("Starting FIO run for job: %s" % job_name)
913        self.log.info("Using FIO: %s" % self.fio_bin)
914
915        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
916        try:
917            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
918                                    "--output=%s" % output_filename, "--eta=never"], True)
919            self.log.info(output)
920            self.log.info("FIO run finished. Results in: %s" % output_filename)
921        except subprocess.CalledProcessError as e:
922            self.log.error("ERROR: Fio process failed!")
923            self.log.error(e.stdout)
924
925    def sys_config(self):
926        self.log.info("====Kernel release:====")
927        self.log.info(self.exec_cmd(["uname", "-r"]))
928        self.log.info("====Kernel command line:====")
929        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
930        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
931        self.log.info("====sysctl conf:====")
932        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
933        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
934        self.log.info("====Cpu power info:====")
935        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
936
937
938class KernelTarget(Target):
939    def __init__(self, name, general_config, target_config):
940        super().__init__(name, general_config, target_config)
941        # Defaults
942        self.nvmet_bin = "nvmetcli"
943
944        if "nvmet_bin" in target_config:
945            self.nvmet_bin = target_config["nvmet_bin"]
946
947    def load_drivers(self):
948        self.log.info("Loading drivers")
949        super().load_drivers()
950        if self.null_block:
951            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
952
953    def configure_adq(self):
954        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
955
956    def adq_configure_tc(self):
957        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
958
959    def adq_set_busy_read(self, busy_read_val):
960        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
961        return {"net.core.busy_read": 0}
962
963    def stop(self):
964        self.nvmet_command(self.nvmet_bin, "clear")
965        self.restore_settings()
966
967    def get_nvme_device_bdf(self, nvme_dev_path):
968        nvme_name = os.path.basename(nvme_dev_path)
969        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
970
971    def get_nvme_devices(self):
972        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
973        nvme_list = []
974        for dev in dev_list:
975            if "nvme" not in dev:
976                continue
977            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
978                continue
979            if len(self.nvme_allowlist) == 0:
980                nvme_list.append(dev)
981                continue
982            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
983                nvme_list.append(dev)
984        return dev_list
985
986    def nvmet_command(self, nvmet_bin, command):
987        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
988
989    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
990
991        nvmet_cfg = {
992            "ports": [],
993            "hosts": [],
994            "subsystems": [],
995        }
996
997        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
998            port = str(4420 + bdev_num)
999            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1000            serial = "SPDK00%s" % bdev_num
1001            bdev_name = nvme_list[bdev_num]
1002
1003            nvmet_cfg["subsystems"].append({
1004                "allowed_hosts": [],
1005                "attr": {
1006                    "allow_any_host": "1",
1007                    "serial": serial,
1008                    "version": "1.3"
1009                },
1010                "namespaces": [
1011                    {
1012                        "device": {
1013                            "path": bdev_name,
1014                            "uuid": "%s" % uuid.uuid4()
1015                        },
1016                        "enable": 1,
1017                        "nsid": port
1018                    }
1019                ],
1020                "nqn": nqn
1021            })
1022
1023            nvmet_cfg["ports"].append({
1024                "addr": {
1025                    "adrfam": "ipv4",
1026                    "traddr": ip,
1027                    "trsvcid": port,
1028                    "trtype": self.transport
1029                },
1030                "portid": bdev_num,
1031                "referrals": [],
1032                "subsystems": [nqn]
1033            })
1034
1035            self.subsystem_info_list.append((port, nqn, ip))
1036        self.subsys_no = len(self.subsystem_info_list)
1037
1038        with open("kernel.conf", "w") as fh:
1039            fh.write(json.dumps(nvmet_cfg, indent=2))
1040
1041    def tgt_start(self):
1042        self.log.info("Configuring kernel NVMeOF Target")
1043
1044        if self.null_block:
1045            self.log.info("Configuring with null block device.")
1046            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1047        else:
1048            self.log.info("Configuring with NVMe drives.")
1049            nvme_list = self.get_nvme_devices()
1050
1051        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1052        self.subsys_no = len(nvme_list)
1053
1054        self.nvmet_command(self.nvmet_bin, "clear")
1055        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1056
1057        if self.enable_adq:
1058            self.adq_configure_tc()
1059
1060        self.log.info("Done configuring kernel NVMeOF Target")
1061
1062
1063class SPDKTarget(Target):
1064    def __init__(self, name, general_config, target_config):
1065        # Required fields
1066        self.core_mask = target_config["core_mask"]
1067        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1068
1069        super().__init__(name, general_config, target_config)
1070
1071        # Defaults
1072        self.dif_insert_strip = False
1073        self.null_block_dif_type = 0
1074        self.num_shared_buffers = 4096
1075        self.max_queue_depth = 128
1076        self.bpf_proc = None
1077        self.bpf_scripts = []
1078        self.enable_dsa = False
1079        self.scheduler_core_limit = None
1080        self.iobuf_small_pool_count = 16383
1081        self.iobuf_large_pool_count = 2047
1082
1083        if "num_shared_buffers" in target_config:
1084            self.num_shared_buffers = target_config["num_shared_buffers"]
1085        if "max_queue_depth" in target_config:
1086            self.max_queue_depth = target_config["max_queue_depth"]
1087        if "null_block_dif_type" in target_config:
1088            self.null_block_dif_type = target_config["null_block_dif_type"]
1089        if "dif_insert_strip" in target_config:
1090            self.dif_insert_strip = target_config["dif_insert_strip"]
1091        if "bpf_scripts" in target_config:
1092            self.bpf_scripts = target_config["bpf_scripts"]
1093        if "dsa_settings" in target_config:
1094            self.enable_dsa = target_config["dsa_settings"]
1095        if "scheduler_core_limit" in target_config:
1096            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1097        if "iobuf_small_pool_count" in target_config:
1098            self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"]
1099        if "iobuf_large_pool_count" in target_config:
1100            self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"]
1101
1102        self.log.info("====DSA settings:====")
1103        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1104
1105    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1106        if mode not in ["default", "bynode", "cpulist",
1107                        "shared", "split", "split-bynode"]:
1108            self.log.error("%s irq affinity setting not supported" % mode)
1109            raise Exception
1110
1111        # Create core list from SPDK's mask and change it to string.
1112        # This is the type configure_irq_affinity expects for cpulist parameter.
1113        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1114        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1115        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1116
1117        if mode == "shared":
1118            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1119        elif mode == "split":
1120            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1121        elif mode == "split-bynode":
1122            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1123        else:
1124            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1125
1126    def adq_set_busy_read(self, busy_read_val):
1127        return {"net.core.busy_read": busy_read_val}
1128
1129    def get_nvme_devices_count(self):
1130        return len(self.get_nvme_devices())
1131
1132    def get_nvme_devices(self):
1133        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1134        bdev_bdfs = []
1135        for bdev in bdev_subsys_json_obj["config"]:
1136            bdev_traddr = bdev["params"]["traddr"]
1137            if bdev_traddr in self.nvme_blocklist:
1138                continue
1139            if len(self.nvme_allowlist) == 0:
1140                bdev_bdfs.append(bdev_traddr)
1141            if bdev_traddr in self.nvme_allowlist:
1142                bdev_bdfs.append(bdev_traddr)
1143        return bdev_bdfs
1144
1145    def spdk_tgt_configure(self):
1146        self.log.info("Configuring SPDK NVMeOF target via RPC")
1147
1148        if self.enable_adq:
1149            self.adq_configure_tc()
1150
1151        # Create transport layer
1152        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1153                                       num_shared_buffers=self.num_shared_buffers,
1154                                       max_queue_depth=self.max_queue_depth,
1155                                       dif_insert_or_strip=self.dif_insert_strip,
1156                                       sock_priority=self.adq_priority)
1157        self.log.info("SPDK NVMeOF transport layer:")
1158        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1159
1160        if self.null_block:
1161            self.spdk_tgt_add_nullblock(self.null_block)
1162            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1163        else:
1164            self.spdk_tgt_add_nvme_conf()
1165            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1166
1167        self.log.info("Done configuring SPDK NVMeOF Target")
1168
1169    def spdk_tgt_add_nullblock(self, null_block_count):
1170        md_size = 0
1171        block_size = 4096
1172        if self.null_block_dif_type != 0:
1173            md_size = 128
1174
1175        self.log.info("Adding null block bdevices to config via RPC")
1176        for i in range(null_block_count):
1177            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1178            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1179                                      dif_type=self.null_block_dif_type, md_size=md_size)
1180        self.log.info("SPDK Bdevs configuration:")
1181        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1182
1183    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1184        self.log.info("Adding NVMe bdevs to config via RPC")
1185
1186        bdfs = self.get_nvme_devices()
1187        bdfs = [b.replace(":", ".") for b in bdfs]
1188
1189        if req_num_disks:
1190            if req_num_disks > len(bdfs):
1191                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1192                sys.exit(1)
1193            else:
1194                bdfs = bdfs[0:req_num_disks]
1195
1196        for i, bdf in enumerate(bdfs):
1197            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1198
1199        self.log.info("SPDK Bdevs configuration:")
1200        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1201
1202    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1203        self.log.info("Adding subsystems to config")
1204        if not req_num_disks:
1205            req_num_disks = self.get_nvme_devices_count()
1206
1207        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1208            port = str(4420 + bdev_num)
1209            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1210            serial = "SPDK00%s" % bdev_num
1211            bdev_name = "Nvme%sn1" % bdev_num
1212
1213            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1214                                           allow_any_host=True, max_namespaces=8)
1215            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1216            for nqn_name in [nqn, "discovery"]:
1217                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1218                                                     nqn=nqn_name,
1219                                                     trtype=self.transport,
1220                                                     traddr=ip,
1221                                                     trsvcid=port,
1222                                                     adrfam="ipv4")
1223            self.subsystem_info_list.append((port, nqn, ip))
1224        self.subsys_no = len(self.subsystem_info_list)
1225
1226        self.log.info("SPDK NVMeOF subsystem configuration:")
1227        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1228
1229    def bpf_start(self):
1230        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1231        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1232        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1233        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1234
1235        with open(self.pid, "r") as fh:
1236            nvmf_pid = str(fh.readline())
1237
1238        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1239        self.log.info(cmd)
1240        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1241
1242    def tgt_start(self):
1243        if self.null_block:
1244            self.subsys_no = 1
1245        else:
1246            self.subsys_no = self.get_nvme_devices_count()
1247        self.log.info("Starting SPDK NVMeOF Target process")
1248        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1249        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1250        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1251
1252        with open(self.pid, "w") as fh:
1253            fh.write(str(proc.pid))
1254        self.nvmf_proc = proc
1255        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1256        self.log.info("Waiting for spdk to initialize...")
1257        while True:
1258            if os.path.exists("/var/tmp/spdk.sock"):
1259                break
1260            time.sleep(1)
1261        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1262
1263        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1264        rpc.iobuf.iobuf_set_options(self.client,
1265                                    small_pool_count=self.iobuf_small_pool_count,
1266                                    large_pool_count=self.iobuf_large_pool_count,
1267                                    small_bufsize=None,
1268                                    large_bufsize=None)
1269
1270        if self.enable_zcopy:
1271            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1272                                           enable_zerocopy_send_server=True)
1273            self.log.info("Target socket options:")
1274            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1275
1276        if self.enable_adq:
1277            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1278            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1279                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1280
1281        if self.enable_dsa:
1282            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1283            self.log.info("Target DSA accel module enabled")
1284
1285        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1286        rpc.framework_start_init(self.client)
1287
1288        if self.bpf_scripts:
1289            self.bpf_start()
1290
1291        self.spdk_tgt_configure()
1292
1293    def stop(self):
1294        if self.bpf_proc:
1295            self.log.info("Stopping BPF Trace script")
1296            self.bpf_proc.terminate()
1297            self.bpf_proc.wait()
1298
1299        if hasattr(self, "nvmf_proc"):
1300            try:
1301                self.nvmf_proc.terminate()
1302                self.nvmf_proc.wait(timeout=30)
1303            except Exception as e:
1304                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1305                self.log.info(e)
1306                self.nvmf_proc.kill()
1307                self.nvmf_proc.communicate()
1308                # Try to clean up RPC socket files if they were not removed
1309                # because of using 'kill'
1310                try:
1311                    os.remove("/var/tmp/spdk.sock")
1312                    os.remove("/var/tmp/spdk.sock.lock")
1313                except FileNotFoundError:
1314                    pass
1315        self.restore_settings()
1316
1317
1318class KernelInitiator(Initiator):
1319    def __init__(self, name, general_config, initiator_config):
1320        super().__init__(name, general_config, initiator_config)
1321
1322        # Defaults
1323        self.extra_params = ""
1324        self.ioengine = "libaio"
1325        self.spdk_conf = ""
1326
1327        if "num_cores" in initiator_config:
1328            self.num_cores = initiator_config["num_cores"]
1329
1330        if "extra_params" in initiator_config:
1331            self.extra_params = initiator_config["extra_params"]
1332
1333        if "kernel_engine" in initiator_config:
1334            self.ioengine = initiator_config["kernel_engine"]
1335            if "io_uring" in self.ioengine:
1336                self.extra_params = "--nr-poll-queues=8"
1337
1338    def configure_adq(self):
1339        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1340
1341    def adq_configure_tc(self):
1342        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1343
1344    def adq_set_busy_read(self, busy_read_val):
1345        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1346        return {"net.core.busy_read": 0}
1347
1348    def get_connected_nvme_list(self):
1349        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1350        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1351                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1352        return nvme_list
1353
1354    def init_connect(self):
1355        self.log.info("Below connection attempts may result in error messages, this is expected!")
1356        for subsystem in self.subsystem_info_list:
1357            self.log.info("Trying to connect %s %s %s" % subsystem)
1358            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1359                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1360            time.sleep(2)
1361
1362        if "io_uring" in self.ioengine:
1363            self.log.info("Setting block layer settings for io_uring.")
1364
1365            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1366            #       apparently it's not possible for connected subsystems.
1367            #       Results in "error: Invalid argument"
1368            block_sysfs_settings = {
1369                "iostats": "0",
1370                "rq_affinity": "0",
1371                "nomerges": "2"
1372            }
1373
1374            for disk in self.get_connected_nvme_list():
1375                sysfs = os.path.join("/sys/block", disk, "queue")
1376                for k, v in block_sysfs_settings.items():
1377                    sysfs_opt_path = os.path.join(sysfs, k)
1378                    try:
1379                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1380                    except CalledProcessError as e:
1381                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1382                    finally:
1383                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1384                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1385
1386    def init_disconnect(self):
1387        for subsystem in self.subsystem_info_list:
1388            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1389            time.sleep(1)
1390
1391    def get_nvme_subsystem_numa(self, dev_name):
1392        # Remove two last characters to get controller name instead of subsystem name
1393        nvme_ctrl = os.path.basename(dev_name)[:-2]
1394        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1395                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1396        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1397
1398    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1399        self.available_cpus = self.get_numa_cpu_map()
1400        if len(threads) >= len(subsystems):
1401            threads = range(0, len(subsystems))
1402
1403        # Generate connected nvme devices names and sort them by used NIC numa node
1404        # to allow better grouping when splitting into fio sections.
1405        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1406        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1407        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1408
1409        filename_section = ""
1410        nvme_per_split = int(len(nvme_list) / len(threads))
1411        remainder = len(nvme_list) % len(threads)
1412        iterator = iter(nvme_list)
1413        result = []
1414        for i in range(len(threads)):
1415            result.append([])
1416            for _ in range(nvme_per_split):
1417                result[i].append(next(iterator))
1418                if remainder:
1419                    result[i].append(next(iterator))
1420                    remainder -= 1
1421        for i, r in enumerate(result):
1422            header = "[filename%s]" % i
1423            disks = "\n".join(["filename=%s" % x for x in r])
1424            job_section_qd = round((io_depth * len(r)) / num_jobs)
1425            if job_section_qd == 0:
1426                job_section_qd = 1
1427            iodepth = "iodepth=%s" % job_section_qd
1428
1429            offset_section = ""
1430            if offset:
1431                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1432
1433            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1434
1435            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1436
1437        return filename_section
1438
1439
1440class SPDKInitiator(Initiator):
1441    def __init__(self, name, general_config, initiator_config):
1442        super().__init__(name, general_config, initiator_config)
1443
1444        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1445            self.install_spdk()
1446
1447        # Optional fields
1448        self.enable_data_digest = False
1449        if "enable_data_digest" in initiator_config:
1450            self.enable_data_digest = initiator_config["enable_data_digest"]
1451        if "num_cores" in initiator_config:
1452            self.num_cores = initiator_config["num_cores"]
1453
1454        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1455        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1456
1457    def adq_set_busy_read(self, busy_read_val):
1458        return {"net.core.busy_read": busy_read_val}
1459
1460    def install_spdk(self):
1461        self.log.info("Using fio binary %s" % self.fio_bin)
1462        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1463        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1464        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1465                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1466                       "--enable-lto", "--disable-unit-tests"])
1467        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1468        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1469
1470        self.log.info("SPDK built")
1471        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1472
1473    def init_connect(self):
1474        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1475        # bdev plugin initiates connection just before starting IO traffic.
1476        # This is just to have a "init_connect" equivalent of the same function
1477        # from KernelInitiator class.
1478        # Just prepare bdev.conf JSON file for later use and consider it
1479        # "making a connection".
1480        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1481        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1482
1483    def init_disconnect(self):
1484        # SPDK Initiator does not need to explicity disconnect as this gets done
1485        # after fio bdev plugin finishes IO.
1486        pass
1487
1488    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1489        bdev_cfg_section = {
1490            "subsystems": [
1491                {
1492                    "subsystem": "bdev",
1493                    "config": []
1494                }
1495            ]
1496        }
1497
1498        for i, subsys in enumerate(remote_subsystem_list):
1499            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1500            nvme_ctrl = {
1501                "method": "bdev_nvme_attach_controller",
1502                "params": {
1503                    "name": "Nvme{}".format(i),
1504                    "trtype": self.transport,
1505                    "traddr": sub_addr,
1506                    "trsvcid": sub_port,
1507                    "subnqn": sub_nqn,
1508                    "adrfam": "IPv4"
1509                }
1510            }
1511
1512            if self.enable_adq:
1513                nvme_ctrl["params"].update({"priority": "1"})
1514
1515            if self.enable_data_digest:
1516                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1517
1518            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1519
1520        return json.dumps(bdev_cfg_section, indent=2)
1521
1522    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1523        self.available_cpus = self.get_numa_cpu_map()
1524        filename_section = ""
1525        if len(threads) >= len(subsystems):
1526            threads = range(0, len(subsystems))
1527
1528        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1529        # to allow better grouping when splitting into fio sections.
1530        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1531        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1532        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1533
1534        nvme_per_split = int(len(subsystems) / len(threads))
1535        remainder = len(subsystems) % len(threads)
1536        iterator = iter(filenames)
1537        result = []
1538        for i in range(len(threads)):
1539            result.append([])
1540            for _ in range(nvme_per_split):
1541                result[i].append(next(iterator))
1542            if remainder:
1543                result[i].append(next(iterator))
1544                remainder -= 1
1545        for i, r in enumerate(result):
1546            header = "[filename%s]" % i
1547            disks = "\n".join(["filename=%s" % x for x in r])
1548            job_section_qd = round((io_depth * len(r)) / num_jobs)
1549            if job_section_qd == 0:
1550                job_section_qd = 1
1551            iodepth = "iodepth=%s" % job_section_qd
1552
1553            offset_section = ""
1554            if offset:
1555                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1556
1557            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1558
1559            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1560
1561        return filename_section
1562
1563    def get_nvme_subsystem_numa(self, bdev_name):
1564        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1565        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1566
1567        # Remove two last characters to get controller name instead of subsystem name
1568        nvme_ctrl = bdev_name[:-2]
1569        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1570        return self.get_route_nic_numa(remote_nvme_ip)
1571
1572
1573if __name__ == "__main__":
1574    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1575    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1576
1577    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1578    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1579                        help='Configuration file.')
1580    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1581                        help='Results directory.')
1582    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1583                        help='CSV results filename.')
1584    parser.add_argument('-f', '--force', default=False, action='store_true',
1585                        dest='force', help="""Force script to continue and try to use all
1586                        available NVMe devices during test.
1587                        WARNING: Might result in data loss on used NVMe drives""")
1588
1589    args = parser.parse_args()
1590
1591    logging.basicConfig(level=logging.INFO,
1592                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1593
1594    logging.info("Using config file: %s" % args.config)
1595    with open(args.config, "r") as config:
1596        data = json.load(config)
1597
1598    initiators = []
1599    fio_cases = []
1600
1601    general_config = data["general"]
1602    target_config = data["target"]
1603    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1604
1605    if "null_block_devices" not in data["target"] and \
1606        (args.force is False and
1607            "allowlist" not in data["target"] and
1608            "blocklist" not in data["target"]):
1609        # TODO: Also check if allowlist or blocklist are not empty.
1610        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1611        You can choose to use all available NVMe drives on your system, which may potentially
1612        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1613        exit(1)
1614
1615    for k, v in data.items():
1616        if "target" in k:
1617            v.update({"results_dir": args.results})
1618            if data[k]["mode"] == "spdk":
1619                target_obj = SPDKTarget(k, data["general"], v)
1620            elif data[k]["mode"] == "kernel":
1621                target_obj = KernelTarget(k, data["general"], v)
1622        elif "initiator" in k:
1623            if data[k]["mode"] == "spdk":
1624                init_obj = SPDKInitiator(k, data["general"], v)
1625            elif data[k]["mode"] == "kernel":
1626                init_obj = KernelInitiator(k, data["general"], v)
1627            initiators.append(init_obj)
1628        elif "fio" in k:
1629            fio_workloads = itertools.product(data[k]["bs"],
1630                                              data[k]["qd"],
1631                                              data[k]["rw"])
1632
1633            fio_run_time = data[k]["run_time"]
1634            fio_ramp_time = data[k]["ramp_time"]
1635            fio_rw_mix_read = data[k]["rwmixread"]
1636            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1637            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1638
1639            fio_rate_iops = 0
1640            if "rate_iops" in data[k]:
1641                fio_rate_iops = data[k]["rate_iops"]
1642
1643            fio_offset = False
1644            if "offset" in data[k]:
1645                fio_offset = data[k]["offset"]
1646            fio_offset_inc = 0
1647            if "offset_inc" in data[k]:
1648                fio_offset_inc = data[k]["offset_inc"]
1649        else:
1650            continue
1651
1652    try:
1653        os.mkdir(args.results)
1654    except FileExistsError:
1655        pass
1656
1657    for i in initiators:
1658        target_obj.initiator_info.append(
1659            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1660        )
1661
1662    # TODO: This try block is definietly too large. Need to break this up into separate
1663    # logical blocks to reduce size.
1664    try:
1665        target_obj.tgt_start()
1666
1667        for i in initiators:
1668            i.match_subsystems(target_obj.subsystem_info_list)
1669            if i.enable_adq:
1670                i.adq_configure_tc()
1671
1672        # Poor mans threading
1673        # Run FIO tests
1674        for block_size, io_depth, rw in fio_workloads:
1675            configs = []
1676            for i in initiators:
1677                i.init_connect()
1678                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1679                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1680                                       fio_offset, fio_offset_inc)
1681                configs.append(cfg)
1682
1683            for run_no in range(1, fio_run_num+1):
1684                threads = []
1685                power_daemon = None
1686                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1687
1688                for i, cfg in zip(initiators, configs):
1689                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1690                    threads.append(t)
1691                if target_obj.enable_sar:
1692                    sar_file_prefix = measurements_prefix + "_sar"
1693                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1694                    threads.append(t)
1695
1696                if target_obj.enable_pcm:
1697                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1698
1699                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1700                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1701                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1702                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1703                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1704                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1705
1706                    threads.append(pcm_cpu_t)
1707                    threads.append(pcm_mem_t)
1708                    threads.append(pcm_pow_t)
1709
1710                if target_obj.enable_bw:
1711                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1712                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1713                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1714                    threads.append(t)
1715
1716                if target_obj.enable_dpdk_memory:
1717                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1718                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1719                    threads.append(t)
1720
1721                if target_obj.enable_pm:
1722                    power_daemon = threading.Thread(target=target_obj.measure_power,
1723                                                    args=(args.results, measurements_prefix, script_full_dir,
1724                                                          fio_ramp_time, fio_run_time))
1725                    threads.append(power_daemon)
1726
1727                if target_obj.enable_adq:
1728                    ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1729                    threads.append(ethtool_thread)
1730
1731                for t in threads:
1732                    t.start()
1733                for t in threads:
1734                    t.join()
1735
1736            for i in initiators:
1737                i.init_disconnect()
1738                i.copy_result_files(args.results)
1739        try:
1740            parse_results(args.results, args.csv_filename)
1741        except Exception:
1742            logging.error("There was an error with parsing the results")
1743    finally:
1744        for i in initiators:
1745            try:
1746                i.stop()
1747            except Exception as err:
1748                pass
1749        target_obj.stop()
1750