xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision a9bcb7f2613c28948b141fdc3fd9cc7b1d3f30a9)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.skip_spdk_install = general_config.get('skip_spdk_install', False)
39        self.nic_ips = server_config["nic_ips"]
40        self.mode = server_config["mode"]
41        self.irdma_roce_enable = False
42
43        self.log = logging.getLogger(self.name)
44
45        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
46        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
47            self.irq_scripts_dir = server_config["irq_scripts_dir"]
48
49        self.local_nic_info = []
50        self._nics_json_obj = {}
51        self.svc_restore_dict = {}
52        self.sysctl_restore_dict = {}
53        self.tuned_restore_dict = {}
54        self.governor_restore = ""
55        self.tuned_profile = ""
56
57        self.enable_adq = False
58        self.adq_priority = None
59        self.irq_settings = {"mode": "default"}
60
61        if "adq_enable" in server_config and server_config["adq_enable"]:
62            self.enable_adq = server_config["adq_enable"]
63            self.adq_priority = 1
64        if "irq_settings" in server_config:
65            self.irq_settings.update(server_config["irq_settings"])
66        if "tuned_profile" in server_config:
67            self.tuned_profile = server_config["tuned_profile"]
68        if "irdma_roce_enable" in general_config:
69            self.irdma_roce_enable = general_config["irdma_roce_enable"]
70
71        if not re.match(r'^[A-Za-z0-9\-]+$', name):
72            self.log.info("Please use a name which contains only letters, numbers or dashes")
73            sys.exit(1)
74
75    @staticmethod
76    def get_uncommented_lines(lines):
77        return [line for line in lines if line and not line.startswith('#')]
78
79    def get_nic_name_by_ip(self, ip):
80        if not self._nics_json_obj:
81            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
82            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
83        for nic in self._nics_json_obj:
84            for addr in nic["addr_info"]:
85                if ip in addr["local"]:
86                    return nic["ifname"]
87
88    def set_local_nic_info_helper(self):
89        pass
90
91    def set_local_nic_info(self, pci_info):
92        def extract_network_elements(json_obj):
93            nic_list = []
94            if isinstance(json_obj, list):
95                for x in json_obj:
96                    nic_list.extend(extract_network_elements(x))
97            elif isinstance(json_obj, dict):
98                if "children" in json_obj:
99                    nic_list.extend(extract_network_elements(json_obj["children"]))
100                if "class" in json_obj.keys() and "network" in json_obj["class"]:
101                    nic_list.append(json_obj)
102            return nic_list
103
104        self.local_nic_info = extract_network_elements(pci_info)
105
106    def get_nic_numa_node(self, nic_name):
107        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
108
109    def get_numa_cpu_map(self):
110        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
111        numa_cpu_json_map = {}
112
113        for cpu in numa_cpu_json_obj["cpus"]:
114            cpu_num = int(cpu["cpu"])
115            numa_node = int(cpu["node"])
116            numa_cpu_json_map.setdefault(numa_node, [])
117            numa_cpu_json_map[numa_node].append(cpu_num)
118
119        return numa_cpu_json_map
120
121    # pylint: disable=R0201
122    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
123        return ""
124
125    def configure_system(self):
126        self.load_drivers()
127        self.configure_services()
128        self.configure_sysctl()
129        self.configure_tuned()
130        self.configure_cpu_governor()
131        self.configure_irq_affinity(**self.irq_settings)
132
133    RDMA_PROTOCOL_IWARP = 0
134    RDMA_PROTOCOL_ROCE = 1
135    RDMA_PROTOCOL_UNKNOWN = -1
136
137    def check_rdma_protocol(self):
138        try:
139            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
140            roce_ena = roce_ena.strip()
141            if roce_ena == "0":
142                return self.RDMA_PROTOCOL_IWARP
143            else:
144                return self.RDMA_PROTOCOL_ROCE
145        except CalledProcessError as e:
146            self.log.error("ERROR: failed to check RDMA protocol!")
147            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
148            return self.RDMA_PROTOCOL_UNKNOWN
149
150    def load_drivers(self):
151        self.log.info("Loading drivers")
152        self.exec_cmd(["sudo", "modprobe", "-a",
153                       "nvme-%s" % self.transport,
154                       "nvmet-%s" % self.transport])
155        current_mode = self.check_rdma_protocol()
156        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
157            self.log.error("ERROR: failed to check RDMA protocol mode")
158            return
159        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
160            self.reload_driver("irdma", "roce_ena=1")
161            self.log.info("Loaded irdma driver with RoCE enabled")
162        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
163            self.log.info("Leaving irdma driver with RoCE enabled")
164        else:
165            self.reload_driver("irdma", "roce_ena=0")
166            self.log.info("Loaded irdma driver with iWARP enabled")
167
168    def configure_adq(self):
169        self.adq_load_modules()
170        self.adq_configure_nic()
171
172    def adq_load_modules(self):
173        self.log.info("Modprobing ADQ-related Linux modules...")
174        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
175        for module in adq_module_deps:
176            try:
177                self.exec_cmd(["sudo", "modprobe", module])
178                self.log.info("%s loaded!" % module)
179            except CalledProcessError as e:
180                self.log.error("ERROR: failed to load module %s" % module)
181                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
182
183    def adq_configure_tc(self):
184        self.log.info("Configuring ADQ Traffic classes and filters...")
185
186        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
187        num_queues_tc1 = self.num_cores
188        port_param = "dst_port" if isinstance(self, Target) else "src_port"
189        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
190
191        for nic_ip in self.nic_ips:
192            nic_name = self.get_nic_name_by_ip(nic_ip)
193            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
194
195            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
196                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
197                                "queues", "%s@0" % num_queues_tc0,
198                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
199                                "hw", "1", "mode", "channel"]
200            self.log.info(" ".join(tc_qdisc_map_cmd))
201            self.exec_cmd(tc_qdisc_map_cmd)
202
203            time.sleep(5)
204            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
205            self.log.info(" ".join(tc_qdisc_ingress_cmd))
206            self.exec_cmd(tc_qdisc_ingress_cmd)
207
208            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
209            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
210                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
211
212            for port in nic_ports:
213                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
214                                 "protocol", "ip", "ingress", "prio", "1", "flower",
215                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
216                                 "skip_sw", "hw_tc", "1"]
217                self.log.info(" ".join(tc_filter_cmd))
218                self.exec_cmd(tc_filter_cmd)
219
220            # show tc configuration
221            self.log.info("Show tc configuration for %s NIC..." % nic_name)
222            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
223            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
224            self.log.info("%s" % tc_disk_out)
225            self.log.info("%s" % tc_filter_out)
226
227            # Ethtool coalesce settings must be applied after configuring traffic classes
228            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
229            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
230
231            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
232            xps_cmd = ["sudo", xps_script_path, nic_name]
233            self.log.info(xps_cmd)
234            self.exec_cmd(xps_cmd)
235
236    def reload_driver(self, driver, *modprobe_args):
237
238        try:
239            self.exec_cmd(["sudo", "rmmod", driver])
240            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
241        except CalledProcessError as e:
242            self.log.error("ERROR: failed to reload %s module!" % driver)
243            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
244
245    def adq_configure_nic(self):
246        self.log.info("Configuring NIC port settings for ADQ testing...")
247
248        # Reload the driver first, to make sure any previous settings are re-set.
249        self.reload_driver("ice")
250
251        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
252        for nic in nic_names:
253            self.log.info(nic)
254            try:
255                self.exec_cmd(["sudo", "ethtool", "-K", nic,
256                               "hw-tc-offload", "on"])  # Enable hardware TC offload
257                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
258                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
259                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
260                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
261                # but can be used with 4k block sizes as well
262                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
263                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
264            except subprocess.CalledProcessError as e:
265                self.log.error("ERROR: failed to configure NIC port using ethtool!")
266                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
267                self.log.info("Please update your NIC driver and firmware versions and try again.")
268            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
269            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
270
271    def configure_services(self):
272        self.log.info("Configuring active services...")
273        svc_config = configparser.ConfigParser(strict=False)
274
275        # Below list is valid only for RHEL / Fedora systems and might not
276        # contain valid names for other distributions.
277        svc_target_state = {
278            "firewalld": "inactive",
279            "irqbalance": "inactive",
280            "lldpad.service": "inactive",
281            "lldpad.socket": "inactive"
282        }
283
284        for service in svc_target_state:
285            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
286            out = "\n".join(["[%s]" % service, out])
287            svc_config.read_string(out)
288
289            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
290                continue
291
292            service_state = svc_config[service]["ActiveState"]
293            self.log.info("Current state of %s service is %s" % (service, service_state))
294            self.svc_restore_dict.update({service: service_state})
295            if service_state != "inactive":
296                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
297                self.exec_cmd(["sudo", "systemctl", "stop", service])
298
299    def configure_sysctl(self):
300        self.log.info("Tuning sysctl settings...")
301
302        busy_read = 0
303        busy_poll = 0
304        if self.enable_adq and self.mode == "spdk":
305            busy_read = 1
306            busy_poll = 1
307
308        sysctl_opts = {
309            "net.core.busy_poll": busy_poll,
310            "net.core.busy_read": busy_read,
311            "net.core.somaxconn": 4096,
312            "net.core.netdev_max_backlog": 8192,
313            "net.ipv4.tcp_max_syn_backlog": 16384,
314            "net.core.rmem_max": 268435456,
315            "net.core.wmem_max": 268435456,
316            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
317            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
318            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
319            "net.ipv4.route.flush": 1,
320            "vm.overcommit_memory": 1,
321        }
322
323        if self.enable_adq:
324            sysctl_opts.update(self.adq_set_busy_read(1))
325
326        for opt, value in sysctl_opts.items():
327            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
328            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
329
330    def configure_tuned(self):
331        if not self.tuned_profile:
332            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
333            return
334
335        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
336        service = "tuned"
337        tuned_config = configparser.ConfigParser(strict=False)
338
339        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
340        out = "\n".join(["[%s]" % service, out])
341        tuned_config.read_string(out)
342        tuned_state = tuned_config[service]["ActiveState"]
343        self.svc_restore_dict.update({service: tuned_state})
344
345        if tuned_state != "inactive":
346            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
347            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
348
349            self.tuned_restore_dict = {
350                "profile": profile,
351                "mode": profile_mode
352            }
353
354        self.exec_cmd(["sudo", "systemctl", "start", service])
355        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
356        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
357
358    def configure_cpu_governor(self):
359        self.log.info("Setting CPU governor to performance...")
360
361        # This assumes that there is the same CPU scaling governor on each CPU
362        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
363        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
364
365    def get_core_list_from_mask(self, core_mask):
366        # Generate list of individual cores from hex core mask
367        # (e.g. '0xffff') or list containing:
368        # - individual cores (e.g. '1, 2, 3')
369        # - core ranges (e.g. '0-3')
370        # - mix of both (e.g. '0, 1-4, 9, 11-13')
371
372        core_list = []
373        if "0x" in core_mask:
374            core_mask_int = int(core_mask, 16)
375            for i in range(core_mask_int.bit_length()):
376                if (1 << i) & core_mask_int:
377                    core_list.append(i)
378            return core_list
379        else:
380            # Core list can be provided in .json config with square brackets
381            # remove them first
382            core_mask = core_mask.replace("[", "")
383            core_mask = core_mask.replace("]", "")
384
385            for i in core_mask.split(","):
386                if "-" in i:
387                    start, end = i.split("-")
388                    core_range = range(int(start), int(end) + 1)
389                    core_list.extend(core_range)
390                else:
391                    core_list.append(int(i))
392            return core_list
393
394    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
395        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
396
397        if mode not in ["default", "bynode", "cpulist"]:
398            raise ValueError("%s irq affinity setting not supported" % mode)
399
400        if mode == "cpulist" and not cpulist:
401            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
402
403        affinity_script = "set_irq_affinity.sh"
404        if "default" not in mode:
405            affinity_script = "set_irq_affinity_cpulist.sh"
406            system_cpu_map = self.get_numa_cpu_map()
407        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
408
409        def cpu_list_to_string(cpulist):
410            return ",".join(map(lambda x: str(x), cpulist))
411
412        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
413        for nic_name in nic_names:
414            irq_cmd = ["sudo", irq_script_path]
415
416            # Use only CPU cores matching NIC NUMA node.
417            # Remove any CPU cores if they're on exclusion list.
418            if mode == "bynode":
419                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
420                if cpulist and exclude_cpulist:
421                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
422                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
423                    if not irq_cpus:
424                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
425                irq_cmd.append(cpu_list_to_string(irq_cpus))
426
427            if mode == "cpulist":
428                irq_cpus = self.get_core_list_from_mask(cpulist)
429                if exclude_cpulist:
430                    # Flatten system CPU list, we don't need NUMA awareness here
431                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
432                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
433                    if not irq_cpus:
434                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
435                irq_cmd.append(cpu_list_to_string(irq_cpus))
436
437            irq_cmd.append(nic_name)
438            self.log.info(irq_cmd)
439            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
440
441    def restore_services(self):
442        self.log.info("Restoring services...")
443        for service, state in self.svc_restore_dict.items():
444            cmd = "stop" if state == "inactive" else "start"
445            self.exec_cmd(["sudo", "systemctl", cmd, service])
446
447    def restore_sysctl(self):
448        self.log.info("Restoring sysctl settings...")
449        for opt, value in self.sysctl_restore_dict.items():
450            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
451
452    def restore_tuned(self):
453        self.log.info("Restoring tuned-adm settings...")
454
455        if not self.tuned_restore_dict:
456            return
457
458        if self.tuned_restore_dict["mode"] == "auto":
459            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
460            self.log.info("Reverted tuned-adm to auto_profile.")
461        else:
462            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
463            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
464
465    def restore_governor(self):
466        self.log.info("Restoring CPU governor setting...")
467        if self.governor_restore:
468            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
469            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
470
471    def restore_settings(self):
472        self.restore_governor()
473        self.restore_tuned()
474        self.restore_services()
475        self.restore_sysctl()
476        if self.enable_adq:
477            self.reload_driver("ice")
478
479
480class Target(Server):
481    def __init__(self, name, general_config, target_config):
482        super().__init__(name, general_config, target_config)
483
484        # Defaults
485        self.enable_zcopy = False
486        self.scheduler_name = "static"
487        self.null_block = 0
488        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
489        self.subsystem_info_list = []
490        self.initiator_info = []
491        self.nvme_allowlist = []
492        self.nvme_blocklist = []
493
494        # Target-side measurement options
495        self.enable_pm = True
496        self.enable_sar = True
497        self.enable_pcm = True
498        self.enable_bw = True
499        self.enable_dpdk_memory = True
500
501        if "null_block_devices" in target_config:
502            self.null_block = target_config["null_block_devices"]
503        if "scheduler_settings" in target_config:
504            self.scheduler_name = target_config["scheduler_settings"]
505        if "zcopy_settings" in target_config:
506            self.enable_zcopy = target_config["zcopy_settings"]
507        if "results_dir" in target_config:
508            self.results_dir = target_config["results_dir"]
509        if "blocklist" in target_config:
510            self.nvme_blocklist = target_config["blocklist"]
511        if "allowlist" in target_config:
512            self.nvme_allowlist = target_config["allowlist"]
513            # Blocklist takes precedence, remove common elements from allowlist
514            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
515        if "enable_pm" in target_config:
516            self.enable_pm = target_config["enable_pm"]
517        if "enable_sar" in target_config:
518            self.enable_sar = target_config["enable_sar"]
519        if "enable_pcm" in target_config:
520            self.enable_pcm = target_config["enable_pcm"]
521        if "enable_bandwidth" in target_config:
522            self.enable_bw = target_config["enable_bandwidth"]
523        if "enable_dpdk_memory" in target_config:
524            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
525
526        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
527        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
528
529        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
530        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
531        self.set_local_nic_info(self.set_local_nic_info_helper())
532
533        if self.skip_spdk_install is False:
534            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
535
536        self.configure_system()
537        if self.enable_adq:
538            self.configure_adq()
539            self.configure_irq_affinity()
540        self.sys_config()
541
542    def set_local_nic_info_helper(self):
543        return json.loads(self.exec_cmd(["lshw", "-json"]))
544
545    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
546        stderr_opt = None
547        if stderr_redirect:
548            stderr_opt = subprocess.STDOUT
549        if change_dir:
550            old_cwd = os.getcwd()
551            os.chdir(change_dir)
552            self.log.info("Changing directory to %s" % change_dir)
553
554        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
555
556        if change_dir:
557            os.chdir(old_cwd)
558            self.log.info("Changing directory to %s" % old_cwd)
559        return out
560
561    def zip_spdk_sources(self, spdk_dir, dest_file):
562        self.log.info("Zipping SPDK source directory")
563        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
564        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
565            for file in files:
566                fh.write(os.path.relpath(os.path.join(root, file)))
567        fh.close()
568        self.log.info("Done zipping")
569
570    @staticmethod
571    def _chunks(input_list, chunks_no):
572        div, rem = divmod(len(input_list), chunks_no)
573        for i in range(chunks_no):
574            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
575            yield input_list[si:si + (div + 1 if i < rem else div)]
576
577    def spread_bdevs(self, req_disks):
578        # Spread available block devices indexes:
579        # - evenly across available initiator systems
580        # - evenly across available NIC interfaces for
581        #   each initiator
582        # Not NUMA aware.
583        ip_bdev_map = []
584        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
585
586        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
587            self.initiator_info[i]["bdev_range"] = init_chunk
588            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
589            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
590                for c in nic_chunk:
591                    ip_bdev_map.append((ip, c))
592        return ip_bdev_map
593
594    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
595        cpu_number = os.cpu_count()
596        sar_idle_sum = 0
597        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
598        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
599
600        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
601        time.sleep(ramp_time)
602        self.log.info("Starting SAR measurements")
603
604        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
605        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
606            for line in out.split("\n"):
607                if "Average" in line:
608                    if "CPU" in line:
609                        self.log.info("Summary CPU utilization from SAR:")
610                        self.log.info(line)
611                    elif "all" in line:
612                        self.log.info(line)
613                    else:
614                        sar_idle_sum += float(line.split()[7])
615            fh.write(out)
616        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
617
618        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
619            f.write("%0.2f" % sar_cpu_usage)
620
621    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
622        time.sleep(ramp_time)
623        self.log.info("Starting power measurements")
624        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
625                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
626                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
627
628    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
629        time.sleep(ramp_time)
630        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
631        pcm_memory = subprocess.Popen(cmd)
632        time.sleep(run_time)
633        pcm_memory.terminate()
634
635    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
636        time.sleep(ramp_time)
637        cmd = ["pcm", "1", "-i=%s" % run_time,
638               "-csv=%s/%s" % (results_dir, pcm_file_name)]
639        subprocess.run(cmd)
640        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
641        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
642        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
643        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
644        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
645
646    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
647        time.sleep(ramp_time)
648        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
649        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
650            fh.write(out)
651        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
652        #       Improve this so that additional file containing measurements average is generated too.
653
654    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
655        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
656        time.sleep(ramp_time)
657        self.log.info("INFO: starting network bandwidth measure")
658        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
659                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
660        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
661        #       Improve this so that additional file containing measurements average is generated too.
662        # TODO: Monitor only these interfaces which are currently used to run the workload.
663
664    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
665        self.log.info("INFO: waiting to generate DPDK memory usage")
666        time.sleep(ramp_time)
667        self.log.info("INFO: generating DPDK memory usage")
668        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
669        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
670
671    def sys_config(self):
672        self.log.info("====Kernel release:====")
673        self.log.info(os.uname().release)
674        self.log.info("====Kernel command line:====")
675        with open('/proc/cmdline') as f:
676            cmdline = f.readlines()
677            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
678        self.log.info("====sysctl conf:====")
679        with open('/etc/sysctl.conf') as f:
680            sysctl = f.readlines()
681            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
682        self.log.info("====Cpu power info:====")
683        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
684        self.log.info("====zcopy settings:====")
685        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
686        self.log.info("====Scheduler settings:====")
687        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
688
689
690class Initiator(Server):
691    def __init__(self, name, general_config, initiator_config):
692        super().__init__(name, general_config, initiator_config)
693
694        # Required fields
695        self.ip = initiator_config["ip"]
696        self.target_nic_ips = initiator_config["target_nic_ips"]
697
698        # Defaults
699        self.cpus_allowed = None
700        self.cpus_allowed_policy = "shared"
701        self.spdk_dir = "/tmp/spdk"
702        self.fio_bin = "/usr/src/fio/fio"
703        self.nvmecli_bin = "nvme"
704        self.cpu_frequency = None
705        self.subsystem_info_list = []
706        self.allow_cpu_sharing = True
707
708        if "spdk_dir" in initiator_config:
709            self.spdk_dir = initiator_config["spdk_dir"]
710        if "fio_bin" in initiator_config:
711            self.fio_bin = initiator_config["fio_bin"]
712        if "nvmecli_bin" in initiator_config:
713            self.nvmecli_bin = initiator_config["nvmecli_bin"]
714        if "cpus_allowed" in initiator_config:
715            self.cpus_allowed = initiator_config["cpus_allowed"]
716        if "cpus_allowed_policy" in initiator_config:
717            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
718        if "cpu_frequency" in initiator_config:
719            self.cpu_frequency = initiator_config["cpu_frequency"]
720        if "allow_cpu_sharing" in initiator_config:
721            self.allow_cpu_sharing = initiator_config["allow_cpu_sharing"]
722        if os.getenv('SPDK_WORKSPACE'):
723            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
724
725        self.ssh_connection = paramiko.SSHClient()
726        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
727        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
728        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
729        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
730        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
731
732        if self.skip_spdk_install is False:
733            self.copy_spdk("/tmp/spdk.zip")
734
735        self.set_local_nic_info(self.set_local_nic_info_helper())
736        self.set_cpu_frequency()
737        self.configure_system()
738        if self.enable_adq:
739            self.configure_adq()
740        self.sys_config()
741
742    def set_local_nic_info_helper(self):
743        return json.loads(self.exec_cmd(["lshw", "-json"]))
744
745    def stop(self):
746        self.restore_settings()
747        self.ssh_connection.close()
748
749    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
750        if change_dir:
751            cmd = ["cd", change_dir, ";", *cmd]
752
753        # In case one of the command elements contains whitespace and is not
754        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
755        # when sending to remote system.
756        for i, c in enumerate(cmd):
757            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
758                cmd[i] = '"%s"' % c
759        cmd = " ".join(cmd)
760
761        # Redirect stderr to stdout thanks using get_pty option if needed
762        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
763        out = stdout.read().decode(encoding="utf-8")
764
765        # Check the return code
766        rc = stdout.channel.recv_exit_status()
767        if rc:
768            raise CalledProcessError(int(rc), cmd, out)
769
770        return out
771
772    def put_file(self, local, remote_dest):
773        ftp = self.ssh_connection.open_sftp()
774        ftp.put(local, remote_dest)
775        ftp.close()
776
777    def get_file(self, remote, local_dest):
778        ftp = self.ssh_connection.open_sftp()
779        ftp.get(remote, local_dest)
780        ftp.close()
781
782    def copy_spdk(self, local_spdk_zip):
783        self.log.info("Copying SPDK sources to initiator %s" % self.name)
784        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
785        self.log.info("Copied sources zip from target")
786        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
787        self.log.info("Sources unpacked")
788
789    def copy_result_files(self, dest_dir):
790        self.log.info("Copying results")
791
792        if not os.path.exists(dest_dir):
793            os.mkdir(dest_dir)
794
795        # Get list of result files from initiator and copy them back to target
796        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
797
798        for file in file_list:
799            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
800                          os.path.join(dest_dir, file))
801        self.log.info("Done copying results")
802
803    def match_subsystems(self, target_subsytems):
804        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
805        subsystems.sort(key=lambda x: x[1])
806        self.log.info("Found matching subsystems on target side:")
807        for s in subsystems:
808            self.log.info(s)
809        self.subsystem_info_list = subsystems
810
811    def gen_fio_filename_conf(self, *args, **kwargs):
812        # Logic implemented in SPDKInitiator and KernelInitiator classes
813        pass
814
815    def get_route_nic_numa(self, remote_nvme_ip):
816        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
817        local_nvme_nic = local_nvme_nic[0]["dev"]
818        return self.get_nic_numa_node(local_nvme_nic)
819
820    @staticmethod
821    def gen_fio_offset_section(offset_inc, num_jobs):
822        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
823        return "\n".join(["size=%s%%" % offset_inc,
824                          "offset=0%",
825                          "offset_increment=%s%%" % offset_inc])
826
827    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
828        numa_stats = {}
829        allowed_cpus = []
830        for nvme in fio_filenames_list:
831            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
832            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
833
834        # Use the most common NUMA node for this chunk to allocate memory and CPUs
835        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
836
837        # Check if we have enough free CPUs to pop from the list before assigning them
838        if len(self.available_cpus[section_local_numa]) < num_jobs:
839            if self.allow_cpu_sharing:
840                self.log.info("Regenerating available CPU list %s" % section_local_numa)
841                # Remove still available CPUs from the regenerated list. We don't want to
842                # regenerate it with duplicates.
843                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
844                self.available_cpus[section_local_numa].extend(cpus_regen)
845                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
846            else:
847                self.log.error("No more free CPU cores to use from allowed_cpus list!")
848                raise IndexError
849
850        for _ in range(num_jobs):
851            try:
852                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
853            except IndexError:
854                self.log.error("No more free CPU cores to use from allowed_cpus list!")
855                raise
856
857        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
858                          "numa_mem_policy=prefer:%s" % section_local_numa])
859
860    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
861                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
862                       offset=False, offset_inc=0):
863        fio_conf_template = """
864[global]
865ioengine={ioengine}
866{spdk_conf}
867thread=1
868group_reporting=1
869direct=1
870percentile_list=50:90:99:99.5:99.9:99.99:99.999
871
872norandommap=1
873rw={rw}
874rwmixread={rwmixread}
875bs={block_size}
876time_based=1
877ramp_time={ramp_time}
878runtime={run_time}
879rate_iops={rate_iops}
880"""
881
882        if self.cpus_allowed is not None:
883            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
884            cpus_num = 0
885            cpus = self.cpus_allowed.split(",")
886            for cpu in cpus:
887                if "-" in cpu:
888                    a, b = cpu.split("-")
889                    a = int(a)
890                    b = int(b)
891                    cpus_num += len(range(a, b))
892                else:
893                    cpus_num += 1
894            self.num_cores = cpus_num
895            threads = range(0, self.num_cores)
896        elif hasattr(self, 'num_cores'):
897            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
898            threads = range(0, int(self.num_cores))
899        else:
900            self.num_cores = len(self.subsystem_info_list)
901            threads = range(0, len(self.subsystem_info_list))
902
903        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
904                                                      offset, offset_inc)
905
906        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
907                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
908                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
909
910        # TODO: hipri disabled for now, as it causes fio errors:
911        # io_u error on file /dev/nvme2n1: Operation not supported
912        # See comment in KernelInitiator class, init_connect() function
913        if "io_uring" in self.ioengine:
914            fio_config = fio_config + """
915fixedbufs=1
916registerfiles=1
917#hipri=1
918"""
919        if num_jobs:
920            fio_config = fio_config + "numjobs=%s \n" % num_jobs
921        if self.cpus_allowed is not None:
922            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
923            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
924        fio_config = fio_config + filename_section
925
926        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
927        if hasattr(self, "num_cores"):
928            fio_config_filename += "_%sCPU" % self.num_cores
929        fio_config_filename += ".fio"
930
931        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
932        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
933        self.log.info("Created FIO Config:")
934        self.log.info(fio_config)
935
936        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
937
938    def set_cpu_frequency(self):
939        if self.cpu_frequency is not None:
940            try:
941                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
942                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
943                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
944            except Exception:
945                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
946                sys.exit()
947        else:
948            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
949
950    def run_fio(self, fio_config_file, run_num=1):
951        job_name, _ = os.path.splitext(fio_config_file)
952        self.log.info("Starting FIO run for job: %s" % job_name)
953        self.log.info("Using FIO: %s" % self.fio_bin)
954
955        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
956        try:
957            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
958                                    "--output=%s" % output_filename, "--eta=never"], True)
959            self.log.info(output)
960            self.log.info("FIO run finished. Results in: %s" % output_filename)
961        except subprocess.CalledProcessError as e:
962            self.log.error("ERROR: Fio process failed!")
963            self.log.error(e.stdout)
964
965    def sys_config(self):
966        self.log.info("====Kernel release:====")
967        self.log.info(self.exec_cmd(["uname", "-r"]))
968        self.log.info("====Kernel command line:====")
969        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
970        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
971        self.log.info("====sysctl conf:====")
972        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
973        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
974        self.log.info("====Cpu power info:====")
975        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
976
977
978class KernelTarget(Target):
979    def __init__(self, name, general_config, target_config):
980        super().__init__(name, general_config, target_config)
981        # Defaults
982        self.nvmet_bin = "nvmetcli"
983
984        if "nvmet_bin" in target_config:
985            self.nvmet_bin = target_config["nvmet_bin"]
986
987    def load_drivers(self):
988        self.log.info("Loading drivers")
989        super().load_drivers()
990        if self.null_block:
991            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
992
993    def configure_adq(self):
994        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
995
996    def adq_configure_tc(self):
997        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
998
999    def adq_set_busy_read(self, busy_read_val):
1000        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1001        return {"net.core.busy_read": 0}
1002
1003    def stop(self):
1004        self.nvmet_command(self.nvmet_bin, "clear")
1005        self.restore_settings()
1006
1007    def get_nvme_device_bdf(self, nvme_dev_path):
1008        nvme_name = os.path.basename(nvme_dev_path)
1009        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1010
1011    def get_nvme_devices(self):
1012        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1013        nvme_list = []
1014        for dev in dev_list:
1015            if "nvme" not in dev:
1016                continue
1017            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1018                continue
1019            if len(self.nvme_allowlist) == 0:
1020                nvme_list.append(dev)
1021                continue
1022            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1023                nvme_list.append(dev)
1024        return nvme_list
1025
1026    def nvmet_command(self, nvmet_bin, command):
1027        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1028
1029    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1030
1031        nvmet_cfg = {
1032            "ports": [],
1033            "hosts": [],
1034            "subsystems": [],
1035        }
1036
1037        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1038            port = str(4420 + bdev_num)
1039            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1040            serial = "SPDK00%s" % bdev_num
1041            bdev_name = nvme_list[bdev_num]
1042
1043            nvmet_cfg["subsystems"].append({
1044                "allowed_hosts": [],
1045                "attr": {
1046                    "allow_any_host": "1",
1047                    "serial": serial,
1048                    "version": "1.3"
1049                },
1050                "namespaces": [
1051                    {
1052                        "device": {
1053                            "path": bdev_name,
1054                            "uuid": "%s" % uuid.uuid4()
1055                        },
1056                        "enable": 1,
1057                        "nsid": port
1058                    }
1059                ],
1060                "nqn": nqn
1061            })
1062
1063            nvmet_cfg["ports"].append({
1064                "addr": {
1065                    "adrfam": "ipv4",
1066                    "traddr": ip,
1067                    "trsvcid": port,
1068                    "trtype": self.transport
1069                },
1070                "portid": bdev_num,
1071                "referrals": [],
1072                "subsystems": [nqn]
1073            })
1074
1075            self.subsystem_info_list.append((port, nqn, ip))
1076        self.subsys_no = len(self.subsystem_info_list)
1077
1078        with open("kernel.conf", "w") as fh:
1079            fh.write(json.dumps(nvmet_cfg, indent=2))
1080
1081    def tgt_start(self):
1082        self.log.info("Configuring kernel NVMeOF Target")
1083
1084        if self.null_block:
1085            self.log.info("Configuring with null block device.")
1086            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1087        else:
1088            self.log.info("Configuring with NVMe drives.")
1089            nvme_list = self.get_nvme_devices()
1090
1091        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1092        self.subsys_no = len(nvme_list)
1093
1094        self.nvmet_command(self.nvmet_bin, "clear")
1095        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1096
1097        if self.enable_adq:
1098            self.adq_configure_tc()
1099
1100        self.log.info("Done configuring kernel NVMeOF Target")
1101
1102
1103class SPDKTarget(Target):
1104    def __init__(self, name, general_config, target_config):
1105        # Required fields
1106        self.core_mask = target_config["core_mask"]
1107        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1108
1109        super().__init__(name, general_config, target_config)
1110
1111        # Defaults
1112        self.dif_insert_strip = False
1113        self.null_block_dif_type = 0
1114        self.num_shared_buffers = 4096
1115        self.max_queue_depth = 128
1116        self.bpf_proc = None
1117        self.bpf_scripts = []
1118        self.enable_dsa = False
1119        self.scheduler_core_limit = None
1120        self.iobuf_small_pool_count = 16383
1121        self.iobuf_large_pool_count = 2047
1122        self.num_cqe = 4096
1123
1124        if "num_shared_buffers" in target_config:
1125            self.num_shared_buffers = target_config["num_shared_buffers"]
1126        if "max_queue_depth" in target_config:
1127            self.max_queue_depth = target_config["max_queue_depth"]
1128        if "null_block_dif_type" in target_config:
1129            self.null_block_dif_type = target_config["null_block_dif_type"]
1130        if "dif_insert_strip" in target_config:
1131            self.dif_insert_strip = target_config["dif_insert_strip"]
1132        if "bpf_scripts" in target_config:
1133            self.bpf_scripts = target_config["bpf_scripts"]
1134        if "dsa_settings" in target_config:
1135            self.enable_dsa = target_config["dsa_settings"]
1136        if "scheduler_core_limit" in target_config:
1137            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1138        if "iobuf_small_pool_count" in target_config:
1139            self.iobuf_small_pool_count = target_config["iobuf_small_pool_count"]
1140        if "iobuf_large_pool_count" in target_config:
1141            self.iobuf_large_pool_count = target_config["iobuf_large_pool_count"]
1142        if "num_cqe" in target_config:
1143            self.num_cqe = target_config["num_cqe"]
1144
1145        self.log.info("====DSA settings:====")
1146        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1147
1148    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1149        if mode not in ["default", "bynode", "cpulist",
1150                        "shared", "split", "split-bynode"]:
1151            self.log.error("%s irq affinity setting not supported" % mode)
1152            raise Exception
1153
1154        # Create core list from SPDK's mask and change it to string.
1155        # This is the type configure_irq_affinity expects for cpulist parameter.
1156        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1157        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1158        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1159
1160        if mode == "shared":
1161            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1162        elif mode == "split":
1163            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1164        elif mode == "split-bynode":
1165            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1166        else:
1167            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1168
1169    def adq_set_busy_read(self, busy_read_val):
1170        return {"net.core.busy_read": busy_read_val}
1171
1172    def get_nvme_devices_count(self):
1173        return len(self.get_nvme_devices())
1174
1175    def get_nvme_devices(self):
1176        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1177        bdev_bdfs = []
1178        for bdev in bdev_subsys_json_obj["config"]:
1179            bdev_traddr = bdev["params"]["traddr"]
1180            if bdev_traddr in self.nvme_blocklist:
1181                continue
1182            if len(self.nvme_allowlist) == 0:
1183                bdev_bdfs.append(bdev_traddr)
1184            if bdev_traddr in self.nvme_allowlist:
1185                bdev_bdfs.append(bdev_traddr)
1186        return bdev_bdfs
1187
1188    def spdk_tgt_configure(self):
1189        self.log.info("Configuring SPDK NVMeOF target via RPC")
1190
1191        # Create transport layer
1192        nvmf_transport_params = {
1193            "client": self.client,
1194            "trtype": self.transport,
1195            "num_shared_buffers": self.num_shared_buffers,
1196            "max_queue_depth": self.max_queue_depth,
1197            "dif_insert_or_strip": self.dif_insert_strip,
1198            "sock_priority": self.adq_priority,
1199            "num_cqe": self.num_cqe
1200        }
1201
1202        if self.enable_adq:
1203            nvmf_transport_params["acceptor_poll_rate"] = 10000
1204
1205        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1206        self.log.info("SPDK NVMeOF transport layer:")
1207        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1208
1209        if self.null_block:
1210            self.spdk_tgt_add_nullblock(self.null_block)
1211            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1212        else:
1213            self.spdk_tgt_add_nvme_conf()
1214            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1215
1216        if self.enable_adq:
1217            self.adq_configure_tc()
1218
1219        self.log.info("Done configuring SPDK NVMeOF Target")
1220
1221    def spdk_tgt_add_nullblock(self, null_block_count):
1222        md_size = 0
1223        block_size = 4096
1224        if self.null_block_dif_type != 0:
1225            md_size = 128
1226
1227        self.log.info("Adding null block bdevices to config via RPC")
1228        for i in range(null_block_count):
1229            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1230            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1231                                      dif_type=self.null_block_dif_type, md_size=md_size)
1232        self.log.info("SPDK Bdevs configuration:")
1233        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1234
1235    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1236        self.log.info("Adding NVMe bdevs to config via RPC")
1237
1238        bdfs = self.get_nvme_devices()
1239        bdfs = [b.replace(":", ".") for b in bdfs]
1240
1241        if req_num_disks:
1242            if req_num_disks > len(bdfs):
1243                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1244                sys.exit(1)
1245            else:
1246                bdfs = bdfs[0:req_num_disks]
1247
1248        for i, bdf in enumerate(bdfs):
1249            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1250
1251        self.log.info("SPDK Bdevs configuration:")
1252        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1253
1254    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1255        self.log.info("Adding subsystems to config")
1256        if not req_num_disks:
1257            req_num_disks = self.get_nvme_devices_count()
1258
1259        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1260            port = str(4420 + bdev_num)
1261            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1262            serial = "SPDK00%s" % bdev_num
1263            bdev_name = "Nvme%sn1" % bdev_num
1264
1265            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1266                                           allow_any_host=True, max_namespaces=8)
1267            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1268            for nqn_name in [nqn, "discovery"]:
1269                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1270                                                     nqn=nqn_name,
1271                                                     trtype=self.transport,
1272                                                     traddr=ip,
1273                                                     trsvcid=port,
1274                                                     adrfam="ipv4")
1275            self.subsystem_info_list.append((port, nqn, ip))
1276        self.subsys_no = len(self.subsystem_info_list)
1277
1278        self.log.info("SPDK NVMeOF subsystem configuration:")
1279        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1280
1281    def bpf_start(self):
1282        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1283        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1284        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1285        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1286
1287        with open(self.pid, "r") as fh:
1288            nvmf_pid = str(fh.readline())
1289
1290        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1291        self.log.info(cmd)
1292        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1293
1294    def tgt_start(self):
1295        if self.null_block:
1296            self.subsys_no = 1
1297        else:
1298            self.subsys_no = self.get_nvme_devices_count()
1299        self.log.info("Starting SPDK NVMeOF Target process")
1300        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1301        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1302        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1303
1304        with open(self.pid, "w") as fh:
1305            fh.write(str(proc.pid))
1306        self.nvmf_proc = proc
1307        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1308        self.log.info("Waiting for spdk to initialize...")
1309        while True:
1310            if os.path.exists("/var/tmp/spdk.sock"):
1311                break
1312            time.sleep(1)
1313        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1314
1315        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1316        rpc.iobuf.iobuf_set_options(self.client,
1317                                    small_pool_count=self.iobuf_small_pool_count,
1318                                    large_pool_count=self.iobuf_large_pool_count,
1319                                    small_bufsize=None,
1320                                    large_bufsize=None)
1321
1322        if self.enable_zcopy:
1323            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1324                                           enable_zerocopy_send_server=True)
1325            self.log.info("Target socket options:")
1326            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1327
1328        if self.enable_adq:
1329            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1330            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1331                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1332
1333        if self.enable_dsa:
1334            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1335            self.log.info("Target DSA accel module enabled")
1336
1337        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1338        rpc.framework_start_init(self.client)
1339
1340        if self.bpf_scripts:
1341            self.bpf_start()
1342
1343        self.spdk_tgt_configure()
1344
1345    def stop(self):
1346        if self.bpf_proc:
1347            self.log.info("Stopping BPF Trace script")
1348            self.bpf_proc.terminate()
1349            self.bpf_proc.wait()
1350
1351        if hasattr(self, "nvmf_proc"):
1352            try:
1353                self.nvmf_proc.terminate()
1354                self.nvmf_proc.wait(timeout=30)
1355            except Exception as e:
1356                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1357                self.log.info(e)
1358                self.nvmf_proc.kill()
1359                self.nvmf_proc.communicate()
1360                # Try to clean up RPC socket files if they were not removed
1361                # because of using 'kill'
1362                try:
1363                    os.remove("/var/tmp/spdk.sock")
1364                    os.remove("/var/tmp/spdk.sock.lock")
1365                except FileNotFoundError:
1366                    pass
1367        self.restore_settings()
1368
1369
1370class KernelInitiator(Initiator):
1371    def __init__(self, name, general_config, initiator_config):
1372        super().__init__(name, general_config, initiator_config)
1373
1374        # Defaults
1375        self.extra_params = ""
1376        self.ioengine = "libaio"
1377        self.spdk_conf = ""
1378
1379        if "num_cores" in initiator_config:
1380            self.num_cores = initiator_config["num_cores"]
1381
1382        if "extra_params" in initiator_config:
1383            self.extra_params = initiator_config["extra_params"]
1384
1385        if "kernel_engine" in initiator_config:
1386            self.ioengine = initiator_config["kernel_engine"]
1387            if "io_uring" in self.ioengine:
1388                self.extra_params += ' --nr-poll-queues=8'
1389
1390    def configure_adq(self):
1391        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1392
1393    def adq_configure_tc(self):
1394        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1395
1396    def adq_set_busy_read(self, busy_read_val):
1397        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1398        return {"net.core.busy_read": 0}
1399
1400    def get_connected_nvme_list(self):
1401        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1402        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1403                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1404        return nvme_list
1405
1406    def init_connect(self):
1407        self.log.info("Below connection attempts may result in error messages, this is expected!")
1408        for subsystem in self.subsystem_info_list:
1409            self.log.info("Trying to connect %s %s %s" % subsystem)
1410            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1411                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1412            time.sleep(2)
1413
1414        if "io_uring" in self.ioengine:
1415            self.log.info("Setting block layer settings for io_uring.")
1416
1417            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1418            #       apparently it's not possible for connected subsystems.
1419            #       Results in "error: Invalid argument"
1420            block_sysfs_settings = {
1421                "iostats": "0",
1422                "rq_affinity": "0",
1423                "nomerges": "2"
1424            }
1425
1426            for disk in self.get_connected_nvme_list():
1427                sysfs = os.path.join("/sys/block", disk, "queue")
1428                for k, v in block_sysfs_settings.items():
1429                    sysfs_opt_path = os.path.join(sysfs, k)
1430                    try:
1431                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1432                    except CalledProcessError as e:
1433                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1434                    finally:
1435                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1436                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1437
1438    def init_disconnect(self):
1439        for subsystem in self.subsystem_info_list:
1440            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1441            time.sleep(1)
1442
1443    def get_nvme_subsystem_numa(self, dev_name):
1444        # Remove two last characters to get controller name instead of subsystem name
1445        nvme_ctrl = os.path.basename(dev_name)[:-2]
1446        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1447                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1448        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1449
1450    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1451        self.available_cpus = self.get_numa_cpu_map()
1452        if len(threads) >= len(subsystems):
1453            threads = range(0, len(subsystems))
1454
1455        # Generate connected nvme devices names and sort them by used NIC numa node
1456        # to allow better grouping when splitting into fio sections.
1457        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1458        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1459        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1460
1461        filename_section = ""
1462        nvme_per_split = int(len(nvme_list) / len(threads))
1463        remainder = len(nvme_list) % len(threads)
1464        iterator = iter(nvme_list)
1465        result = []
1466        for i in range(len(threads)):
1467            result.append([])
1468            for _ in range(nvme_per_split):
1469                result[i].append(next(iterator))
1470                if remainder:
1471                    result[i].append(next(iterator))
1472                    remainder -= 1
1473        for i, r in enumerate(result):
1474            header = "[filename%s]" % i
1475            disks = "\n".join(["filename=%s" % x for x in r])
1476            job_section_qd = round((io_depth * len(r)) / num_jobs)
1477            if job_section_qd == 0:
1478                job_section_qd = 1
1479            iodepth = "iodepth=%s" % job_section_qd
1480
1481            offset_section = ""
1482            if offset:
1483                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1484
1485            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1486
1487            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1488
1489        return filename_section
1490
1491
1492class SPDKInitiator(Initiator):
1493    def __init__(self, name, general_config, initiator_config):
1494        super().__init__(name, general_config, initiator_config)
1495
1496        if self.skip_spdk_install is False:
1497            self.install_spdk()
1498
1499        # Optional fields
1500        self.enable_data_digest = False
1501        if "enable_data_digest" in initiator_config:
1502            self.enable_data_digest = initiator_config["enable_data_digest"]
1503        if "num_cores" in initiator_config:
1504            self.num_cores = initiator_config["num_cores"]
1505
1506        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1507        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1508
1509    def adq_set_busy_read(self, busy_read_val):
1510        return {"net.core.busy_read": busy_read_val}
1511
1512    def install_spdk(self):
1513        self.log.info("Using fio binary %s" % self.fio_bin)
1514        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1515        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1516        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1517                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1518                       "--enable-lto", "--disable-unit-tests"])
1519        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1520        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1521
1522        self.log.info("SPDK built")
1523        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1524
1525    def init_connect(self):
1526        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1527        # bdev plugin initiates connection just before starting IO traffic.
1528        # This is just to have a "init_connect" equivalent of the same function
1529        # from KernelInitiator class.
1530        # Just prepare bdev.conf JSON file for later use and consider it
1531        # "making a connection".
1532        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1533        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1534
1535    def init_disconnect(self):
1536        # SPDK Initiator does not need to explicity disconnect as this gets done
1537        # after fio bdev plugin finishes IO.
1538        pass
1539
1540    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1541        bdev_cfg_section = {
1542            "subsystems": [
1543                {
1544                    "subsystem": "bdev",
1545                    "config": []
1546                }
1547            ]
1548        }
1549
1550        for i, subsys in enumerate(remote_subsystem_list):
1551            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1552            nvme_ctrl = {
1553                "method": "bdev_nvme_attach_controller",
1554                "params": {
1555                    "name": "Nvme{}".format(i),
1556                    "trtype": self.transport,
1557                    "traddr": sub_addr,
1558                    "trsvcid": sub_port,
1559                    "subnqn": sub_nqn,
1560                    "adrfam": "IPv4"
1561                }
1562            }
1563
1564            if self.enable_adq:
1565                nvme_ctrl["params"].update({"priority": "1"})
1566
1567            if self.enable_data_digest:
1568                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1569
1570            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1571
1572        return json.dumps(bdev_cfg_section, indent=2)
1573
1574    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1575        self.available_cpus = self.get_numa_cpu_map()
1576        filename_section = ""
1577        if len(threads) >= len(subsystems):
1578            threads = range(0, len(subsystems))
1579
1580        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1581        # to allow better grouping when splitting into fio sections.
1582        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1583        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1584        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1585
1586        nvme_per_split = int(len(subsystems) / len(threads))
1587        remainder = len(subsystems) % len(threads)
1588        iterator = iter(filenames)
1589        result = []
1590        for i in range(len(threads)):
1591            result.append([])
1592            for _ in range(nvme_per_split):
1593                result[i].append(next(iterator))
1594            if remainder:
1595                result[i].append(next(iterator))
1596                remainder -= 1
1597        for i, r in enumerate(result):
1598            header = "[filename%s]" % i
1599            disks = "\n".join(["filename=%s" % x for x in r])
1600            job_section_qd = round((io_depth * len(r)) / num_jobs)
1601            if job_section_qd == 0:
1602                job_section_qd = 1
1603            iodepth = "iodepth=%s" % job_section_qd
1604
1605            offset_section = ""
1606            if offset:
1607                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1608
1609            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1610
1611            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1612
1613        return filename_section
1614
1615    def get_nvme_subsystem_numa(self, bdev_name):
1616        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1617        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1618
1619        # Remove two last characters to get controller name instead of subsystem name
1620        nvme_ctrl = bdev_name[:-2]
1621        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1622        return self.get_route_nic_numa(remote_nvme_ip)
1623
1624
1625if __name__ == "__main__":
1626    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1627    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1628
1629    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1630    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1631                        help='Configuration file.')
1632    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1633                        help='Results directory.')
1634    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1635                        help='CSV results filename.')
1636    parser.add_argument('-f', '--force', default=False, action='store_true',
1637                        dest='force', help="""Force script to continue and try to use all
1638                        available NVMe devices during test.
1639                        WARNING: Might result in data loss on used NVMe drives""")
1640
1641    args = parser.parse_args()
1642
1643    logging.basicConfig(level=logging.INFO,
1644                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1645
1646    logging.info("Using config file: %s" % args.config)
1647    with open(args.config, "r") as config:
1648        data = json.load(config)
1649
1650    initiators = []
1651    fio_cases = []
1652
1653    general_config = data["general"]
1654    target_config = data["target"]
1655    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1656
1657    if "null_block_devices" not in data["target"] and \
1658        (args.force is False and
1659            "allowlist" not in data["target"] and
1660            "blocklist" not in data["target"]):
1661        # TODO: Also check if allowlist or blocklist are not empty.
1662        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1663        You can choose to use all available NVMe drives on your system, which may potentially
1664        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1665        exit(1)
1666
1667    for k, v in data.items():
1668        if "target" in k:
1669            v.update({"results_dir": args.results})
1670            if data[k]["mode"] == "spdk":
1671                target_obj = SPDKTarget(k, data["general"], v)
1672            elif data[k]["mode"] == "kernel":
1673                target_obj = KernelTarget(k, data["general"], v)
1674        elif "initiator" in k:
1675            if data[k]["mode"] == "spdk":
1676                init_obj = SPDKInitiator(k, data["general"], v)
1677            elif data[k]["mode"] == "kernel":
1678                init_obj = KernelInitiator(k, data["general"], v)
1679            initiators.append(init_obj)
1680        elif "fio" in k:
1681            fio_workloads = itertools.product(data[k]["bs"],
1682                                              data[k]["qd"],
1683                                              data[k]["rw"])
1684
1685            fio_run_time = data[k]["run_time"]
1686            fio_ramp_time = data[k]["ramp_time"]
1687            fio_rw_mix_read = data[k]["rwmixread"]
1688            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1689            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1690
1691            fio_rate_iops = 0
1692            if "rate_iops" in data[k]:
1693                fio_rate_iops = data[k]["rate_iops"]
1694
1695            fio_offset = False
1696            if "offset" in data[k]:
1697                fio_offset = data[k]["offset"]
1698            fio_offset_inc = 0
1699            if "offset_inc" in data[k]:
1700                fio_offset_inc = data[k]["offset_inc"]
1701        else:
1702            continue
1703
1704    try:
1705        os.mkdir(args.results)
1706    except FileExistsError:
1707        pass
1708
1709    for i in initiators:
1710        target_obj.initiator_info.append(
1711            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1712        )
1713
1714    # TODO: This try block is definietly too large. Need to break this up into separate
1715    # logical blocks to reduce size.
1716    try:
1717        target_obj.tgt_start()
1718
1719        for i in initiators:
1720            i.match_subsystems(target_obj.subsystem_info_list)
1721            if i.enable_adq:
1722                i.adq_configure_tc()
1723
1724        # Poor mans threading
1725        # Run FIO tests
1726        for block_size, io_depth, rw in fio_workloads:
1727            configs = []
1728            for i in initiators:
1729                i.init_connect()
1730                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1731                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1732                                       fio_offset, fio_offset_inc)
1733                configs.append(cfg)
1734
1735            for run_no in range(1, fio_run_num+1):
1736                threads = []
1737                power_daemon = None
1738                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1739
1740                for i, cfg in zip(initiators, configs):
1741                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1742                    threads.append(t)
1743                if target_obj.enable_sar:
1744                    sar_file_prefix = measurements_prefix + "_sar"
1745                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1746                    threads.append(t)
1747
1748                if target_obj.enable_pcm:
1749                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1750
1751                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1752                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1753                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1754                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1755                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1756                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1757
1758                    threads.append(pcm_cpu_t)
1759                    threads.append(pcm_mem_t)
1760                    threads.append(pcm_pow_t)
1761
1762                if target_obj.enable_bw:
1763                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1764                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1765                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1766                    threads.append(t)
1767
1768                if target_obj.enable_dpdk_memory:
1769                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1770                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1771                    threads.append(t)
1772
1773                if target_obj.enable_pm:
1774                    power_daemon = threading.Thread(target=target_obj.measure_power,
1775                                                    args=(args.results, measurements_prefix, script_full_dir,
1776                                                          fio_ramp_time, fio_run_time))
1777                    threads.append(power_daemon)
1778
1779                for t in threads:
1780                    t.start()
1781                for t in threads:
1782                    t.join()
1783
1784            for i in initiators:
1785                i.init_disconnect()
1786                i.copy_result_files(args.results)
1787        try:
1788            parse_results(args.results, args.csv_filename)
1789        except Exception:
1790            logging.error("There was an error with parsing the results")
1791    finally:
1792        for i in initiators:
1793            try:
1794                i.stop()
1795            except Exception as err:
1796                pass
1797        target_obj.stop()
1798