xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 1e3d25b901a6b9d2dce4999e2ecbc02f98d79f05)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False),
57            ConfigField(name='pause_frames', default=False)
58        ]
59        self.read_config(config_fields, general_config)
60        self.transport = self.transport.lower()
61
62        config_fields = [
63            ConfigField(name='nic_ips', required=True),
64            ConfigField(name='mode', required=True),
65            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
66            ConfigField(name='enable_arfs', default=False),
67            ConfigField(name='tuned_profile', default='')
68        ]
69        self.read_config(config_fields, server_config)
70
71        self.local_nic_info = []
72        self._nics_json_obj = {}
73        self.svc_restore_dict = {}
74        self.sysctl_restore_dict = {}
75        self.tuned_restore_dict = {}
76        self.governor_restore = ""
77
78        self.enable_adq = server_config.get('adq_enable', False)
79        self.adq_priority = 1 if self.enable_adq else None
80        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
81
82        if not re.match(r'^[A-Za-z0-9\-]+$', name):
83            self.log.info("Please use a name which contains only letters, numbers or dashes")
84            sys.exit(1)
85
86    def read_config(self, config_fields, config):
87        for config_field in config_fields:
88            value = config.get(config_field.name, config_field.default)
89            if config_field.required is True and value is None:
90                raise ValueError('%s config key is required' % config_field.name)
91
92            setattr(self, config_field.name, value)
93
94    @staticmethod
95    def get_uncommented_lines(lines):
96        return [line for line in lines if line and not line.startswith('#')]
97
98    def get_nic_name_by_ip(self, ip):
99        if not self._nics_json_obj:
100            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
101            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
102        for nic in self._nics_json_obj:
103            for addr in nic["addr_info"]:
104                if ip in addr["local"]:
105                    return nic["ifname"]
106
107    @abstractmethod
108    def set_local_nic_info_helper(self):
109        pass
110
111    def set_local_nic_info(self, pci_info):
112        def extract_network_elements(json_obj):
113            nic_list = []
114            if isinstance(json_obj, list):
115                for x in json_obj:
116                    nic_list.extend(extract_network_elements(x))
117            elif isinstance(json_obj, dict):
118                if "children" in json_obj:
119                    nic_list.extend(extract_network_elements(json_obj["children"]))
120                if "class" in json_obj.keys() and "network" in json_obj["class"]:
121                    nic_list.append(json_obj)
122            return nic_list
123
124        self.local_nic_info = extract_network_elements(pci_info)
125
126    def get_nic_numa_node(self, nic_name):
127        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
128
129    def get_numa_cpu_map(self):
130        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
131        numa_cpu_json_map = {}
132
133        for cpu in numa_cpu_json_obj["cpus"]:
134            cpu_num = int(cpu["cpu"])
135            numa_node = int(cpu["node"])
136            numa_cpu_json_map.setdefault(numa_node, [])
137            numa_cpu_json_map[numa_node].append(cpu_num)
138
139        return numa_cpu_json_map
140
141    # pylint: disable=R0201
142    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
143        return ""
144
145    def configure_system(self):
146        self.load_drivers()
147        self.configure_services()
148        self.configure_sysctl()
149        self.configure_arfs()
150        self.configure_tuned()
151        self.configure_cpu_governor()
152        self.configure_irq_affinity(**self.irq_settings)
153        self.configure_pause_frames()
154
155    RDMA_PROTOCOL_IWARP = 0
156    RDMA_PROTOCOL_ROCE = 1
157    RDMA_PROTOCOL_UNKNOWN = -1
158
159    def check_rdma_protocol(self):
160        try:
161            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
162            roce_ena = roce_ena.strip()
163            if roce_ena == "0":
164                return self.RDMA_PROTOCOL_IWARP
165            else:
166                return self.RDMA_PROTOCOL_ROCE
167        except CalledProcessError as e:
168            self.log.error("ERROR: failed to check RDMA protocol!")
169            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
170            return self.RDMA_PROTOCOL_UNKNOWN
171
172    def load_drivers(self):
173        self.log.info("Loading drivers")
174        self.exec_cmd(["sudo", "modprobe", "-a",
175                       "nvme-%s" % self.transport,
176                       "nvmet-%s" % self.transport])
177        current_mode = self.check_rdma_protocol()
178        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
179            self.log.error("ERROR: failed to check RDMA protocol mode")
180            return
181        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
182            self.reload_driver("irdma", "roce_ena=1")
183            self.log.info("Loaded irdma driver with RoCE enabled")
184        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
185            self.log.info("Leaving irdma driver with RoCE enabled")
186        else:
187            self.reload_driver("irdma", "roce_ena=0")
188            self.log.info("Loaded irdma driver with iWARP enabled")
189
190    def set_pause_frames(self, rx_state, tx_state):
191        for nic_ip in self.nic_ips:
192            nic_name = self.get_nic_name_by_ip(nic_ip)
193            self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state])
194
195    def configure_pause_frames(self):
196        if not self.pause_frames:
197            self.log.info("Turning off pause frames")
198            self.set_pause_frames("off", "off")
199            return
200        self.log.info("Configuring pause frames")
201        self.set_pause_frames("on", "on")
202
203    def configure_arfs(self):
204        rps_flow_cnt = 512
205        if not self.enable_arfs:
206            rps_flow_cnt = 0
207
208        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
209        for nic_name in nic_names:
210            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
211            self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}")
212            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
213            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
214
215            for qf in queue_files:
216                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
217
218    def configure_adq(self):
219        self.adq_load_modules()
220        self.adq_configure_nic()
221
222    def adq_load_modules(self):
223        self.log.info("Modprobing ADQ-related Linux modules...")
224        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
225        for module in adq_module_deps:
226            try:
227                self.exec_cmd(["sudo", "modprobe", module])
228                self.log.info("%s loaded!" % module)
229            except CalledProcessError as e:
230                self.log.error("ERROR: failed to load module %s" % module)
231                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
232
233    def adq_configure_tc(self):
234        self.log.info("Configuring ADQ Traffic classes and filters...")
235
236        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
237        num_queues_tc1 = self.num_cores
238        port_param = "dst_port" if isinstance(self, Target) else "src_port"
239        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
240
241        for nic_ip in self.nic_ips:
242            nic_name = self.get_nic_name_by_ip(nic_ip)
243            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
244
245            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
246                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
247                                "queues", "%s@0" % num_queues_tc0,
248                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
249                                "hw", "1", "mode", "channel"]
250            self.log.info(" ".join(tc_qdisc_map_cmd))
251            self.exec_cmd(tc_qdisc_map_cmd)
252
253            time.sleep(5)
254            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
255            self.log.info(" ".join(tc_qdisc_ingress_cmd))
256            self.exec_cmd(tc_qdisc_ingress_cmd)
257
258            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
259            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
260                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
261
262            for port in nic_ports:
263                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
264                                 "protocol", "ip", "ingress", "prio", "1", "flower",
265                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
266                                 "skip_sw", "hw_tc", "1"]
267                self.log.info(" ".join(tc_filter_cmd))
268                self.exec_cmd(tc_filter_cmd)
269
270            # show tc configuration
271            self.log.info("Show tc configuration for %s NIC..." % nic_name)
272            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
273            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
274            self.log.info("%s" % tc_disk_out)
275            self.log.info("%s" % tc_filter_out)
276
277            # Ethtool coalesce settings must be applied after configuring traffic classes
278            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
279            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
280
281            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
282            xps_cmd = ["sudo", xps_script_path, nic_name]
283            self.log.info(xps_cmd)
284            self.exec_cmd(xps_cmd)
285
286    def reload_driver(self, driver, *modprobe_args):
287
288        try:
289            self.exec_cmd(["sudo", "rmmod", driver])
290            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
291        except CalledProcessError as e:
292            self.log.error("ERROR: failed to reload %s module!" % driver)
293            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
294
295    def adq_configure_nic(self):
296        self.log.info("Configuring NIC port settings for ADQ testing...")
297
298        # Reload the driver first, to make sure any previous settings are re-set.
299        self.reload_driver("ice")
300
301        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
302        for nic in nic_names:
303            self.log.info(nic)
304            try:
305                self.exec_cmd(["sudo", "ethtool", "-K", nic,
306                               "hw-tc-offload", "on"])  # Enable hardware TC offload
307                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
308                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
309                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
310                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
311                # but can be used with 4k block sizes as well
312                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
313                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
314            except subprocess.CalledProcessError as e:
315                self.log.error("ERROR: failed to configure NIC port using ethtool!")
316                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
317                self.log.info("Please update your NIC driver and firmware versions and try again.")
318            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
319            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
320
321    def configure_services(self):
322        self.log.info("Configuring active services...")
323        svc_config = configparser.ConfigParser(strict=False)
324
325        # Below list is valid only for RHEL / Fedora systems and might not
326        # contain valid names for other distributions.
327        svc_target_state = {
328            "firewalld": "inactive",
329            "irqbalance": "inactive",
330            "lldpad.service": "inactive",
331            "lldpad.socket": "inactive"
332        }
333
334        for service in svc_target_state:
335            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
336            out = "\n".join(["[%s]" % service, out])
337            svc_config.read_string(out)
338
339            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
340                continue
341
342            service_state = svc_config[service]["ActiveState"]
343            self.log.info("Current state of %s service is %s" % (service, service_state))
344            self.svc_restore_dict.update({service: service_state})
345            if service_state != "inactive":
346                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
347                self.exec_cmd(["sudo", "systemctl", "stop", service])
348
349    def configure_sysctl(self):
350        self.log.info("Tuning sysctl settings...")
351
352        busy_read = 0
353        busy_poll = 0
354        if self.enable_adq and self.mode == "spdk":
355            busy_read = 1
356            busy_poll = 1
357
358        sysctl_opts = {
359            "net.core.busy_poll": busy_poll,
360            "net.core.busy_read": busy_read,
361            "net.core.somaxconn": 4096,
362            "net.core.netdev_max_backlog": 8192,
363            "net.ipv4.tcp_max_syn_backlog": 16384,
364            "net.core.rmem_max": 268435456,
365            "net.core.wmem_max": 268435456,
366            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
367            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
368            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
369            "net.ipv4.route.flush": 1,
370            "vm.overcommit_memory": 1,
371            "net.core.rps_sock_flow_entries": 0
372        }
373
374        if self.enable_adq:
375            sysctl_opts.update(self.adq_set_busy_read(1))
376
377        if self.enable_arfs:
378            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
379
380        for opt, value in sysctl_opts.items():
381            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
382            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
383
384    def configure_tuned(self):
385        if not self.tuned_profile:
386            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
387            return
388
389        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
390        service = "tuned"
391        tuned_config = configparser.ConfigParser(strict=False)
392
393        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
394        out = "\n".join(["[%s]" % service, out])
395        tuned_config.read_string(out)
396        tuned_state = tuned_config[service]["ActiveState"]
397        self.svc_restore_dict.update({service: tuned_state})
398
399        if tuned_state != "inactive":
400            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
401            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
402
403            self.tuned_restore_dict = {
404                "profile": profile,
405                "mode": profile_mode
406            }
407
408        self.exec_cmd(["sudo", "systemctl", "start", service])
409        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
410        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
411
412    def configure_cpu_governor(self):
413        self.log.info("Setting CPU governor to performance...")
414
415        # This assumes that there is the same CPU scaling governor on each CPU
416        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
417        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
418
419    def get_core_list_from_mask(self, core_mask):
420        # Generate list of individual cores from hex core mask
421        # (e.g. '0xffff') or list containing:
422        # - individual cores (e.g. '1, 2, 3')
423        # - core ranges (e.g. '0-3')
424        # - mix of both (e.g. '0, 1-4, 9, 11-13')
425
426        core_list = []
427        if "0x" in core_mask:
428            core_mask_int = int(core_mask, 16)
429            for i in range(core_mask_int.bit_length()):
430                if (1 << i) & core_mask_int:
431                    core_list.append(i)
432            return core_list
433        else:
434            # Core list can be provided in .json config with square brackets
435            # remove them first
436            core_mask = core_mask.replace("[", "")
437            core_mask = core_mask.replace("]", "")
438
439            for i in core_mask.split(","):
440                if "-" in i:
441                    start, end = i.split("-")
442                    core_range = range(int(start), int(end) + 1)
443                    core_list.extend(core_range)
444                else:
445                    core_list.append(int(i))
446            return core_list
447
448    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
449        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
450
451        if mode not in ["default", "bynode", "cpulist"]:
452            raise ValueError("%s irq affinity setting not supported" % mode)
453
454        if mode == "cpulist" and not cpulist:
455            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
456
457        affinity_script = "set_irq_affinity.sh"
458        if "default" not in mode:
459            affinity_script = "set_irq_affinity_cpulist.sh"
460            system_cpu_map = self.get_numa_cpu_map()
461        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
462
463        def cpu_list_to_string(cpulist):
464            return ",".join(map(lambda x: str(x), cpulist))
465
466        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
467        for nic_name in nic_names:
468            irq_cmd = ["sudo", irq_script_path]
469
470            # Use only CPU cores matching NIC NUMA node.
471            # Remove any CPU cores if they're on exclusion list.
472            if mode == "bynode":
473                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
474                if cpulist and exclude_cpulist:
475                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
476                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
477                    if not irq_cpus:
478                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
479                irq_cmd.append(cpu_list_to_string(irq_cpus))
480
481            if mode == "cpulist":
482                irq_cpus = self.get_core_list_from_mask(cpulist)
483                if exclude_cpulist:
484                    # Flatten system CPU list, we don't need NUMA awareness here
485                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
486                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
487                    if not irq_cpus:
488                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
489                irq_cmd.append(cpu_list_to_string(irq_cpus))
490
491            irq_cmd.append(nic_name)
492            self.log.info(irq_cmd)
493            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
494
495    def restore_services(self):
496        self.log.info("Restoring services...")
497        for service, state in self.svc_restore_dict.items():
498            cmd = "stop" if state == "inactive" else "start"
499            self.exec_cmd(["sudo", "systemctl", cmd, service])
500
501    def restore_sysctl(self):
502        self.log.info("Restoring sysctl settings...")
503        for opt, value in self.sysctl_restore_dict.items():
504            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
505
506    def restore_tuned(self):
507        self.log.info("Restoring tuned-adm settings...")
508
509        if not self.tuned_restore_dict:
510            return
511
512        if self.tuned_restore_dict["mode"] == "auto":
513            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
514            self.log.info("Reverted tuned-adm to auto_profile.")
515        else:
516            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
517            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
518
519    def restore_governor(self):
520        self.log.info("Restoring CPU governor setting...")
521        if self.governor_restore:
522            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
523            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
524
525    def restore_settings(self):
526        self.restore_governor()
527        self.restore_tuned()
528        self.restore_services()
529        self.restore_sysctl()
530        if self.enable_adq:
531            self.reload_driver("ice")
532
533
534class Target(Server):
535    def __init__(self, name, general_config, target_config):
536        super().__init__(name, general_config, target_config)
537
538        # Defaults
539        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
540        self.subsystem_info_list = []
541        self.initiator_info = []
542
543        config_fields = [
544            ConfigField(name='mode', required=True),
545            ConfigField(name='results_dir', required=True),
546            ConfigField(name='enable_pm', default=True),
547            ConfigField(name='enable_sar', default=True),
548            ConfigField(name='enable_pcm', default=True),
549            ConfigField(name='enable_dpdk_memory', default=True)
550        ]
551        self.read_config(config_fields, target_config)
552
553        self.null_block = target_config.get('null_block_devices', 0)
554        self.scheduler_name = target_config.get('scheduler_settings', 'static')
555        self.enable_zcopy = target_config.get('zcopy_settings', False)
556        self.enable_bw = target_config.get('enable_bandwidth', True)
557        self.nvme_blocklist = target_config.get('blocklist', [])
558        self.nvme_allowlist = target_config.get('allowlist', [])
559
560        # Blocklist takes precedence, remove common elements from allowlist
561        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
562
563        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
564        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
565
566        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
567        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
568        self.set_local_nic_info(self.set_local_nic_info_helper())
569
570        if self.skip_spdk_install is False:
571            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
572
573        self.configure_system()
574        if self.enable_adq:
575            self.configure_adq()
576            self.configure_irq_affinity()
577        self.sys_config()
578
579    def set_local_nic_info_helper(self):
580        return json.loads(self.exec_cmd(["lshw", "-json"]))
581
582    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
583        stderr_opt = None
584        if stderr_redirect:
585            stderr_opt = subprocess.STDOUT
586        if change_dir:
587            old_cwd = os.getcwd()
588            os.chdir(change_dir)
589            self.log.info("Changing directory to %s" % change_dir)
590
591        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
592
593        if change_dir:
594            os.chdir(old_cwd)
595            self.log.info("Changing directory to %s" % old_cwd)
596        return out
597
598    def zip_spdk_sources(self, spdk_dir, dest_file):
599        self.log.info("Zipping SPDK source directory")
600        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
601        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
602            for file in files:
603                fh.write(os.path.relpath(os.path.join(root, file)))
604        fh.close()
605        self.log.info("Done zipping")
606
607    @staticmethod
608    def _chunks(input_list, chunks_no):
609        div, rem = divmod(len(input_list), chunks_no)
610        for i in range(chunks_no):
611            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
612            yield input_list[si:si + (div + 1 if i < rem else div)]
613
614    def spread_bdevs(self, req_disks):
615        # Spread available block devices indexes:
616        # - evenly across available initiator systems
617        # - evenly across available NIC interfaces for
618        #   each initiator
619        # Not NUMA aware.
620        ip_bdev_map = []
621        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
622
623        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
624            self.initiator_info[i]["bdev_range"] = init_chunk
625            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
626            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
627                for c in nic_chunk:
628                    ip_bdev_map.append((ip, c))
629        return ip_bdev_map
630
631    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
632        cpu_number = os.cpu_count()
633        sar_idle_sum = 0
634        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
635        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
636
637        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
638        time.sleep(ramp_time)
639        self.log.info("Starting SAR measurements")
640
641        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
642        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
643            for line in out.split("\n"):
644                if "Average" in line:
645                    if "CPU" in line:
646                        self.log.info("Summary CPU utilization from SAR:")
647                        self.log.info(line)
648                    elif "all" in line:
649                        self.log.info(line)
650                    else:
651                        sar_idle_sum += float(line.split()[7])
652            fh.write(out)
653        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
654
655        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
656            f.write("%0.2f" % sar_cpu_usage)
657
658    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
659        time.sleep(ramp_time)
660        self.log.info("Starting power measurements")
661        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
662                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
663                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
664
665    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
666        time.sleep(ramp_time)
667        cmd = ["pcm", "1", "-i=%s" % run_time,
668               "-csv=%s/%s" % (results_dir, pcm_file_name)]
669        subprocess.run(cmd)
670        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
671        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
672        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
673        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
674        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
675
676    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
677        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
678        time.sleep(ramp_time)
679        self.log.info("INFO: starting network bandwidth measure")
680        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
681                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
682        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
683        #       Improve this so that additional file containing measurements average is generated too.
684        # TODO: Monitor only these interfaces which are currently used to run the workload.
685
686    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
687        self.log.info("INFO: waiting to generate DPDK memory usage")
688        time.sleep(ramp_time)
689        self.log.info("INFO: generating DPDK memory usage")
690        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
691        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
692
693    def sys_config(self):
694        self.log.info("====Kernel release:====")
695        self.log.info(os.uname().release)
696        self.log.info("====Kernel command line:====")
697        with open('/proc/cmdline') as f:
698            cmdline = f.readlines()
699            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
700        self.log.info("====sysctl conf:====")
701        with open('/etc/sysctl.conf') as f:
702            sysctl = f.readlines()
703            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
704        self.log.info("====Cpu power info:====")
705        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
706        self.log.info("====zcopy settings:====")
707        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
708        self.log.info("====Scheduler settings:====")
709        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
710
711
712class Initiator(Server):
713    def __init__(self, name, general_config, initiator_config):
714        super().__init__(name, general_config, initiator_config)
715
716        # required fields and defaults
717        config_fields = [
718            ConfigField(name='mode', required=True),
719            ConfigField(name='ip', required=True),
720            ConfigField(name='target_nic_ips', required=True),
721            ConfigField(name='cpus_allowed', default=None),
722            ConfigField(name='cpus_allowed_policy', default='shared'),
723            ConfigField(name='spdk_dir', default='/tmp/spdk'),
724            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
725            ConfigField(name='nvmecli_bin', default='nvme'),
726            ConfigField(name='cpu_frequency', default=None),
727            ConfigField(name='allow_cpu_sharing', default=True)
728        ]
729
730        self.read_config(config_fields, initiator_config)
731
732        if os.getenv('SPDK_WORKSPACE'):
733            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
734
735        self.subsystem_info_list = []
736
737        self.ssh_connection = paramiko.SSHClient()
738        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
739        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
740        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
741        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
742        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
743
744        if self.skip_spdk_install is False:
745            self.copy_spdk("/tmp/spdk.zip")
746
747        self.set_local_nic_info(self.set_local_nic_info_helper())
748        self.set_cpu_frequency()
749        self.configure_system()
750        if self.enable_adq:
751            self.configure_adq()
752        self.sys_config()
753
754    def set_local_nic_info_helper(self):
755        return json.loads(self.exec_cmd(["lshw", "-json"]))
756
757    def stop(self):
758        self.restore_settings()
759        self.ssh_connection.close()
760
761    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
762        if change_dir:
763            cmd = ["cd", change_dir, ";", *cmd]
764
765        # In case one of the command elements contains whitespace and is not
766        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
767        # when sending to remote system.
768        for i, c in enumerate(cmd):
769            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
770                cmd[i] = '"%s"' % c
771        cmd = " ".join(cmd)
772
773        # Redirect stderr to stdout thanks using get_pty option if needed
774        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
775        out = stdout.read().decode(encoding="utf-8")
776
777        # Check the return code
778        rc = stdout.channel.recv_exit_status()
779        if rc:
780            raise CalledProcessError(int(rc), cmd, out)
781
782        return out
783
784    def put_file(self, local, remote_dest):
785        ftp = self.ssh_connection.open_sftp()
786        ftp.put(local, remote_dest)
787        ftp.close()
788
789    def get_file(self, remote, local_dest):
790        ftp = self.ssh_connection.open_sftp()
791        ftp.get(remote, local_dest)
792        ftp.close()
793
794    def copy_spdk(self, local_spdk_zip):
795        self.log.info("Copying SPDK sources to initiator %s" % self.name)
796        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
797        self.log.info("Copied sources zip from target")
798        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
799        self.log.info("Sources unpacked")
800
801    def copy_result_files(self, dest_dir):
802        self.log.info("Copying results")
803
804        if not os.path.exists(dest_dir):
805            os.mkdir(dest_dir)
806
807        # Get list of result files from initiator and copy them back to target
808        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
809
810        for file in file_list:
811            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
812                          os.path.join(dest_dir, file))
813        self.log.info("Done copying results")
814
815    def match_subsystems(self, target_subsytems):
816        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
817        subsystems.sort(key=lambda x: x[1])
818        self.log.info("Found matching subsystems on target side:")
819        for s in subsystems:
820            self.log.info(s)
821        self.subsystem_info_list = subsystems
822
823    @abstractmethod
824    def init_connect(self):
825        pass
826
827    @abstractmethod
828    def init_disconnect(self):
829        pass
830
831    @abstractmethod
832    def gen_fio_filename_conf(self, *args, **kwargs):
833        # Logic implemented in SPDKInitiator and KernelInitiator classes
834        pass
835
836    def get_route_nic_numa(self, remote_nvme_ip):
837        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
838        local_nvme_nic = local_nvme_nic[0]["dev"]
839        return self.get_nic_numa_node(local_nvme_nic)
840
841    @staticmethod
842    def gen_fio_offset_section(offset_inc, num_jobs):
843        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
844        return "\n".join(["size=%s%%" % offset_inc,
845                          "offset=0%",
846                          "offset_increment=%s%%" % offset_inc])
847
848    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
849        numa_stats = {}
850        allowed_cpus = []
851        for nvme in fio_filenames_list:
852            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
853            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
854
855        # Use the most common NUMA node for this chunk to allocate memory and CPUs
856        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
857
858        # Check if we have enough free CPUs to pop from the list before assigning them
859        if len(self.available_cpus[section_local_numa]) < num_jobs:
860            if self.allow_cpu_sharing:
861                self.log.info("Regenerating available CPU list %s" % section_local_numa)
862                # Remove still available CPUs from the regenerated list. We don't want to
863                # regenerate it with duplicates.
864                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
865                self.available_cpus[section_local_numa].extend(cpus_regen)
866                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
867            else:
868                self.log.error("No more free CPU cores to use from allowed_cpus list!")
869                raise IndexError
870
871        for _ in range(num_jobs):
872            try:
873                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
874            except IndexError:
875                self.log.error("No more free CPU cores to use from allowed_cpus list!")
876                raise
877
878        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
879                          "numa_mem_policy=prefer:%s" % section_local_numa])
880
881    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
882                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
883                       offset=False, offset_inc=0):
884        fio_conf_template = """
885[global]
886ioengine={ioengine}
887{spdk_conf}
888thread=1
889group_reporting=1
890direct=1
891percentile_list=50:90:99:99.5:99.9:99.99:99.999
892
893norandommap=1
894rw={rw}
895rwmixread={rwmixread}
896bs={block_size}
897time_based=1
898ramp_time={ramp_time}
899runtime={run_time}
900rate_iops={rate_iops}
901"""
902
903        if self.cpus_allowed is not None:
904            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
905            cpus_num = 0
906            cpus = self.cpus_allowed.split(",")
907            for cpu in cpus:
908                if "-" in cpu:
909                    a, b = cpu.split("-")
910                    a = int(a)
911                    b = int(b)
912                    cpus_num += len(range(a, b))
913                else:
914                    cpus_num += 1
915            self.num_cores = cpus_num
916            threads = range(0, self.num_cores)
917        elif hasattr(self, 'num_cores'):
918            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
919            threads = range(0, int(self.num_cores))
920        else:
921            self.num_cores = len(self.subsystem_info_list)
922            threads = range(0, len(self.subsystem_info_list))
923
924        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
925                                                      offset, offset_inc)
926
927        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
928                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
929                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
930
931        # TODO: hipri disabled for now, as it causes fio errors:
932        # io_u error on file /dev/nvme2n1: Operation not supported
933        # See comment in KernelInitiator class, init_connect() function
934        if "io_uring" in self.ioengine:
935            fio_config = fio_config + """
936fixedbufs=1
937registerfiles=1
938#hipri=1
939"""
940        if num_jobs:
941            fio_config = fio_config + "numjobs=%s \n" % num_jobs
942        if self.cpus_allowed is not None:
943            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
944            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
945        fio_config = fio_config + filename_section
946
947        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
948        if hasattr(self, "num_cores"):
949            fio_config_filename += "_%sCPU" % self.num_cores
950        fio_config_filename += ".fio"
951
952        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
953        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
954        self.log.info("Created FIO Config:")
955        self.log.info(fio_config)
956
957        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
958
959    def set_cpu_frequency(self):
960        if self.cpu_frequency is not None:
961            try:
962                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
963                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
964                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
965            except Exception:
966                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
967                sys.exit(1)
968        else:
969            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
970
971    def run_fio(self, fio_config_file, run_num=1):
972        job_name, _ = os.path.splitext(fio_config_file)
973        self.log.info("Starting FIO run for job: %s" % job_name)
974        self.log.info("Using FIO: %s" % self.fio_bin)
975
976        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
977        try:
978            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
979                                    "--output=%s" % output_filename, "--eta=never"], True)
980            self.log.info(output)
981            self.log.info("FIO run finished. Results in: %s" % output_filename)
982        except subprocess.CalledProcessError as e:
983            self.log.error("ERROR: Fio process failed!")
984            self.log.error(e.stdout)
985
986    def sys_config(self):
987        self.log.info("====Kernel release:====")
988        self.log.info(self.exec_cmd(["uname", "-r"]))
989        self.log.info("====Kernel command line:====")
990        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
991        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
992        self.log.info("====sysctl conf:====")
993        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
994        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
995        self.log.info("====Cpu power info:====")
996        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
997
998
999class KernelTarget(Target):
1000    def __init__(self, name, general_config, target_config):
1001        super().__init__(name, general_config, target_config)
1002        # Defaults
1003        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1004
1005    def load_drivers(self):
1006        self.log.info("Loading drivers")
1007        super().load_drivers()
1008        if self.null_block:
1009            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1010
1011    def configure_adq(self):
1012        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1013
1014    def adq_configure_tc(self):
1015        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1016
1017    def adq_set_busy_read(self, busy_read_val):
1018        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1019        return {"net.core.busy_read": 0}
1020
1021    def stop(self):
1022        self.nvmet_command(self.nvmet_bin, "clear")
1023        self.restore_settings()
1024
1025    def get_nvme_device_bdf(self, nvme_dev_path):
1026        nvme_name = os.path.basename(nvme_dev_path)
1027        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1028
1029    def get_nvme_devices(self):
1030        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1031        nvme_list = []
1032        for dev in dev_list:
1033            if "nvme" not in dev:
1034                continue
1035            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1036                continue
1037            if len(self.nvme_allowlist) == 0:
1038                nvme_list.append(dev)
1039                continue
1040            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1041                nvme_list.append(dev)
1042        return nvme_list
1043
1044    def nvmet_command(self, nvmet_bin, command):
1045        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1046
1047    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1048
1049        nvmet_cfg = {
1050            "ports": [],
1051            "hosts": [],
1052            "subsystems": [],
1053        }
1054
1055        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1056            port = str(4420 + bdev_num)
1057            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1058            serial = "SPDK00%s" % bdev_num
1059            bdev_name = nvme_list[bdev_num]
1060
1061            nvmet_cfg["subsystems"].append({
1062                "allowed_hosts": [],
1063                "attr": {
1064                    "allow_any_host": "1",
1065                    "serial": serial,
1066                    "version": "1.3"
1067                },
1068                "namespaces": [
1069                    {
1070                        "device": {
1071                            "path": bdev_name,
1072                            "uuid": "%s" % uuid.uuid4()
1073                        },
1074                        "enable": 1,
1075                        "nsid": port
1076                    }
1077                ],
1078                "nqn": nqn
1079            })
1080
1081            nvmet_cfg["ports"].append({
1082                "addr": {
1083                    "adrfam": "ipv4",
1084                    "traddr": ip,
1085                    "trsvcid": port,
1086                    "trtype": self.transport
1087                },
1088                "portid": bdev_num,
1089                "referrals": [],
1090                "subsystems": [nqn]
1091            })
1092
1093            self.subsystem_info_list.append((port, nqn, ip))
1094        self.subsys_no = len(self.subsystem_info_list)
1095
1096        with open("kernel.conf", "w") as fh:
1097            fh.write(json.dumps(nvmet_cfg, indent=2))
1098
1099    def tgt_start(self):
1100        self.log.info("Configuring kernel NVMeOF Target")
1101
1102        if self.null_block:
1103            self.log.info("Configuring with null block device.")
1104            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1105        else:
1106            self.log.info("Configuring with NVMe drives.")
1107            nvme_list = self.get_nvme_devices()
1108
1109        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1110        self.subsys_no = len(nvme_list)
1111
1112        self.nvmet_command(self.nvmet_bin, "clear")
1113        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1114
1115        if self.enable_adq:
1116            self.adq_configure_tc()
1117
1118        self.log.info("Done configuring kernel NVMeOF Target")
1119
1120
1121class SPDKTarget(Target):
1122    def __init__(self, name, general_config, target_config):
1123        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1124        # Method setting IRQ affinity is run as part of parent classes init,
1125        # so we need to have self.core_mask set before changing IRQ affinity.
1126        self.core_mask = target_config["core_mask"]
1127        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1128
1129        super().__init__(name, general_config, target_config)
1130
1131        # Format: property, default value
1132        config_fields = [
1133            ConfigField(name='dif_insert_strip', default=False),
1134            ConfigField(name='null_block_dif_type', default=0),
1135            ConfigField(name='num_shared_buffers', default=4096),
1136            ConfigField(name='max_queue_depth', default=128),
1137            ConfigField(name='bpf_scripts', default=[]),
1138            ConfigField(name='scheduler_core_limit', default=None),
1139            ConfigField(name='dsa_settings', default=False),
1140            ConfigField(name='iobuf_small_pool_count', default=32767),
1141            ConfigField(name='iobuf_large_pool_count', default=16383),
1142            ConfigField(name='num_cqe', default=4096),
1143            ConfigField(name='sock_impl', default='posix')
1144        ]
1145
1146        self.read_config(config_fields, target_config)
1147
1148        self.bpf_proc = None
1149        self.enable_dsa = False
1150
1151        self.log.info("====DSA settings:====")
1152        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1153
1154    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1155        if mode not in ["default", "bynode", "cpulist",
1156                        "shared", "split", "split-bynode"]:
1157            self.log.error("%s irq affinity setting not supported" % mode)
1158            raise Exception
1159
1160        # Create core list from SPDK's mask and change it to string.
1161        # This is the type configure_irq_affinity expects for cpulist parameter.
1162        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1163        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1164        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1165
1166        if mode == "shared":
1167            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1168        elif mode == "split":
1169            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1170        elif mode == "split-bynode":
1171            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1172        else:
1173            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1174
1175    def adq_set_busy_read(self, busy_read_val):
1176        return {"net.core.busy_read": busy_read_val}
1177
1178    def get_nvme_devices_count(self):
1179        return len(self.get_nvme_devices())
1180
1181    def get_nvme_devices(self):
1182        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1183        bdev_bdfs = []
1184        for bdev in bdev_subsys_json_obj["config"]:
1185            bdev_traddr = bdev["params"]["traddr"]
1186            if bdev_traddr in self.nvme_blocklist:
1187                continue
1188            if len(self.nvme_allowlist) == 0:
1189                bdev_bdfs.append(bdev_traddr)
1190            if bdev_traddr in self.nvme_allowlist:
1191                bdev_bdfs.append(bdev_traddr)
1192        return bdev_bdfs
1193
1194    def spdk_tgt_configure(self):
1195        self.log.info("Configuring SPDK NVMeOF target via RPC")
1196
1197        # Create transport layer
1198        nvmf_transport_params = {
1199            "client": self.client,
1200            "trtype": self.transport,
1201            "num_shared_buffers": self.num_shared_buffers,
1202            "max_queue_depth": self.max_queue_depth,
1203            "dif_insert_or_strip": self.dif_insert_strip,
1204            "sock_priority": self.adq_priority,
1205            "num_cqe": self.num_cqe
1206        }
1207
1208        if self.enable_adq:
1209            nvmf_transport_params["acceptor_poll_rate"] = 10000
1210
1211        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1212        self.log.info("SPDK NVMeOF transport layer:")
1213        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1214
1215        if self.null_block:
1216            self.spdk_tgt_add_nullblock(self.null_block)
1217            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1218        else:
1219            self.spdk_tgt_add_nvme_conf()
1220            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1221
1222        if self.enable_adq:
1223            self.adq_configure_tc()
1224
1225        self.log.info("Done configuring SPDK NVMeOF Target")
1226
1227    def spdk_tgt_add_nullblock(self, null_block_count):
1228        md_size = 0
1229        block_size = 4096
1230        if self.null_block_dif_type != 0:
1231            md_size = 128
1232
1233        self.log.info("Adding null block bdevices to config via RPC")
1234        for i in range(null_block_count):
1235            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1236            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1237                                      dif_type=self.null_block_dif_type, md_size=md_size)
1238        self.log.info("SPDK Bdevs configuration:")
1239        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1240
1241    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1242        self.log.info("Adding NVMe bdevs to config via RPC")
1243
1244        bdfs = self.get_nvme_devices()
1245        bdfs = [b.replace(":", ".") for b in bdfs]
1246
1247        if req_num_disks:
1248            if req_num_disks > len(bdfs):
1249                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1250                sys.exit(1)
1251            else:
1252                bdfs = bdfs[0:req_num_disks]
1253
1254        for i, bdf in enumerate(bdfs):
1255            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1256
1257        self.log.info("SPDK Bdevs configuration:")
1258        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1259
1260    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1261        self.log.info("Adding subsystems to config")
1262        if not req_num_disks:
1263            req_num_disks = self.get_nvme_devices_count()
1264
1265        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1266            port = str(4420 + bdev_num)
1267            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1268            serial = "SPDK00%s" % bdev_num
1269            bdev_name = "Nvme%sn1" % bdev_num
1270
1271            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1272                                           allow_any_host=True, max_namespaces=8)
1273            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1274            for nqn_name in [nqn, "discovery"]:
1275                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1276                                                     nqn=nqn_name,
1277                                                     trtype=self.transport,
1278                                                     traddr=ip,
1279                                                     trsvcid=port,
1280                                                     adrfam="ipv4")
1281            self.subsystem_info_list.append((port, nqn, ip))
1282        self.subsys_no = len(self.subsystem_info_list)
1283
1284        self.log.info("SPDK NVMeOF subsystem configuration:")
1285        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1286
1287    def bpf_start(self):
1288        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1289        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1290        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1291        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1292
1293        with open(self.pid, "r") as fh:
1294            nvmf_pid = str(fh.readline())
1295
1296        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1297        self.log.info(cmd)
1298        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1299
1300    def tgt_start(self):
1301        if self.null_block:
1302            self.subsys_no = 1
1303        else:
1304            self.subsys_no = self.get_nvme_devices_count()
1305        self.log.info("Starting SPDK NVMeOF Target process")
1306        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1307        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1308        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1309
1310        with open(self.pid, "w") as fh:
1311            fh.write(str(proc.pid))
1312        self.nvmf_proc = proc
1313        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1314        self.log.info("Waiting for spdk to initialize...")
1315        while True:
1316            if os.path.exists("/var/tmp/spdk.sock"):
1317                break
1318            time.sleep(1)
1319        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1320
1321        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1322        rpc.iobuf.iobuf_set_options(self.client,
1323                                    small_pool_count=self.iobuf_small_pool_count,
1324                                    large_pool_count=self.iobuf_large_pool_count,
1325                                    small_bufsize=None,
1326                                    large_bufsize=None)
1327
1328        if self.enable_zcopy:
1329            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1330                                           enable_zerocopy_send_server=True)
1331            self.log.info("Target socket options:")
1332            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1333
1334        if self.enable_adq:
1335            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1336            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1337                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1338
1339        if self.enable_dsa:
1340            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1341            self.log.info("Target DSA accel module enabled")
1342
1343        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1344        rpc.framework_start_init(self.client)
1345
1346        if self.bpf_scripts:
1347            self.bpf_start()
1348
1349        self.spdk_tgt_configure()
1350
1351    def stop(self):
1352        if self.bpf_proc:
1353            self.log.info("Stopping BPF Trace script")
1354            self.bpf_proc.terminate()
1355            self.bpf_proc.wait()
1356
1357        if hasattr(self, "nvmf_proc"):
1358            try:
1359                self.nvmf_proc.terminate()
1360                self.nvmf_proc.wait(timeout=30)
1361            except Exception as e:
1362                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1363                self.log.info(e)
1364                self.nvmf_proc.kill()
1365                self.nvmf_proc.communicate()
1366                # Try to clean up RPC socket files if they were not removed
1367                # because of using 'kill'
1368                try:
1369                    os.remove("/var/tmp/spdk.sock")
1370                    os.remove("/var/tmp/spdk.sock.lock")
1371                except FileNotFoundError:
1372                    pass
1373        self.restore_settings()
1374
1375
1376class KernelInitiator(Initiator):
1377    def __init__(self, name, general_config, initiator_config):
1378        super().__init__(name, general_config, initiator_config)
1379
1380        # Defaults
1381        self.extra_params = initiator_config.get('extra_params', '')
1382
1383        self.ioengine = "libaio"
1384        self.spdk_conf = ""
1385
1386        if "num_cores" in initiator_config:
1387            self.num_cores = initiator_config["num_cores"]
1388
1389        if "kernel_engine" in initiator_config:
1390            self.ioengine = initiator_config["kernel_engine"]
1391            if "io_uring" in self.ioengine:
1392                self.extra_params += ' --nr-poll-queues=8'
1393
1394    def configure_adq(self):
1395        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1396
1397    def adq_configure_tc(self):
1398        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1399
1400    def adq_set_busy_read(self, busy_read_val):
1401        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1402        return {"net.core.busy_read": 0}
1403
1404    def get_connected_nvme_list(self):
1405        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1406        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1407                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1408        return nvme_list
1409
1410    def init_connect(self):
1411        self.log.info("Below connection attempts may result in error messages, this is expected!")
1412        for subsystem in self.subsystem_info_list:
1413            self.log.info("Trying to connect %s %s %s" % subsystem)
1414            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1415                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1416            time.sleep(2)
1417
1418        if "io_uring" in self.ioengine:
1419            self.log.info("Setting block layer settings for io_uring.")
1420
1421            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1422            #       apparently it's not possible for connected subsystems.
1423            #       Results in "error: Invalid argument"
1424            block_sysfs_settings = {
1425                "iostats": "0",
1426                "rq_affinity": "0",
1427                "nomerges": "2"
1428            }
1429
1430            for disk in self.get_connected_nvme_list():
1431                sysfs = os.path.join("/sys/block", disk, "queue")
1432                for k, v in block_sysfs_settings.items():
1433                    sysfs_opt_path = os.path.join(sysfs, k)
1434                    try:
1435                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1436                    except CalledProcessError as e:
1437                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1438                    finally:
1439                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1440                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1441
1442    def init_disconnect(self):
1443        for subsystem in self.subsystem_info_list:
1444            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1445            time.sleep(1)
1446
1447    def get_nvme_subsystem_numa(self, dev_name):
1448        # Remove two last characters to get controller name instead of subsystem name
1449        nvme_ctrl = os.path.basename(dev_name)[:-2]
1450        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1451                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1452        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1453
1454    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1455        self.available_cpus = self.get_numa_cpu_map()
1456        if len(threads) >= len(subsystems):
1457            threads = range(0, len(subsystems))
1458
1459        # Generate connected nvme devices names and sort them by used NIC numa node
1460        # to allow better grouping when splitting into fio sections.
1461        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1462        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1463        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1464
1465        filename_section = ""
1466        nvme_per_split = int(len(nvme_list) / len(threads))
1467        remainder = len(nvme_list) % len(threads)
1468        iterator = iter(nvme_list)
1469        result = []
1470        for i in range(len(threads)):
1471            result.append([])
1472            for _ in range(nvme_per_split):
1473                result[i].append(next(iterator))
1474                if remainder:
1475                    result[i].append(next(iterator))
1476                    remainder -= 1
1477        for i, r in enumerate(result):
1478            header = "[filename%s]" % i
1479            disks = "\n".join(["filename=%s" % x for x in r])
1480            job_section_qd = round((io_depth * len(r)) / num_jobs)
1481            if job_section_qd == 0:
1482                job_section_qd = 1
1483            iodepth = "iodepth=%s" % job_section_qd
1484
1485            offset_section = ""
1486            if offset:
1487                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1488
1489            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1490
1491            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1492
1493        return filename_section
1494
1495
1496class SPDKInitiator(Initiator):
1497    def __init__(self, name, general_config, initiator_config):
1498        super().__init__(name, general_config, initiator_config)
1499
1500        if self.skip_spdk_install is False:
1501            self.install_spdk()
1502
1503        # Optional fields
1504        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1505        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1506        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1507        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1508
1509        if "num_cores" in initiator_config:
1510            self.num_cores = initiator_config["num_cores"]
1511
1512        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1513        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1514
1515    def adq_set_busy_read(self, busy_read_val):
1516        return {"net.core.busy_read": busy_read_val}
1517
1518    def install_spdk(self):
1519        self.log.info("Using fio binary %s" % self.fio_bin)
1520        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1521        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1522        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1523                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1524                       "--enable-lto", "--disable-unit-tests"])
1525        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1526        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1527
1528        self.log.info("SPDK built")
1529        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1530
1531    def init_connect(self):
1532        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1533        # bdev plugin initiates connection just before starting IO traffic.
1534        # This is just to have a "init_connect" equivalent of the same function
1535        # from KernelInitiator class.
1536        # Just prepare bdev.conf JSON file for later use and consider it
1537        # "making a connection".
1538        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1539        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1540
1541    def init_disconnect(self):
1542        # SPDK Initiator does not need to explicity disconnect as this gets done
1543        # after fio bdev plugin finishes IO.
1544        return
1545
1546    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1547        spdk_cfg_section = {
1548            "subsystems": [
1549                {
1550                    "subsystem": "bdev",
1551                    "config": []
1552                },
1553                {
1554                    "subsystem": "iobuf",
1555                    "config": [
1556                        {
1557                            "method": "iobuf_set_options",
1558                            "params": {
1559                                "small_pool_count": self.small_pool_count,
1560                                "large_pool_count": self.large_pool_count
1561                            }
1562                        }
1563                    ]
1564                },
1565                {
1566                    "subsystem": "sock",
1567                    "config": [
1568                        {
1569                            "method": "sock_set_default_impl",
1570                            "params": {
1571                                "impl_name": self.sock_impl
1572                            }
1573                        }
1574                    ]
1575                }
1576            ]
1577        }
1578
1579        for i, subsys in enumerate(remote_subsystem_list):
1580            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1581            nvme_ctrl = {
1582                "method": "bdev_nvme_attach_controller",
1583                "params": {
1584                    "name": "Nvme{}".format(i),
1585                    "trtype": self.transport,
1586                    "traddr": sub_addr,
1587                    "trsvcid": sub_port,
1588                    "subnqn": sub_nqn,
1589                    "adrfam": "IPv4"
1590                }
1591            }
1592
1593            if self.enable_adq:
1594                nvme_ctrl["params"].update({"priority": "1"})
1595
1596            if self.enable_data_digest:
1597                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1598
1599            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1600
1601        return json.dumps(spdk_cfg_section, indent=2)
1602
1603    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1604        self.available_cpus = self.get_numa_cpu_map()
1605        filename_section = ""
1606        if len(threads) >= len(subsystems):
1607            threads = range(0, len(subsystems))
1608
1609        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1610        # to allow better grouping when splitting into fio sections.
1611        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1612        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1613        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1614
1615        nvme_per_split = int(len(subsystems) / len(threads))
1616        remainder = len(subsystems) % len(threads)
1617        iterator = iter(filenames)
1618        result = []
1619        for i in range(len(threads)):
1620            result.append([])
1621            for _ in range(nvme_per_split):
1622                result[i].append(next(iterator))
1623            if remainder:
1624                result[i].append(next(iterator))
1625                remainder -= 1
1626        for i, r in enumerate(result):
1627            header = "[filename%s]" % i
1628            disks = "\n".join(["filename=%s" % x for x in r])
1629            job_section_qd = round((io_depth * len(r)) / num_jobs)
1630            if job_section_qd == 0:
1631                job_section_qd = 1
1632            iodepth = "iodepth=%s" % job_section_qd
1633
1634            offset_section = ""
1635            if offset:
1636                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1637
1638            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1639
1640            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1641
1642        return filename_section
1643
1644    def get_nvme_subsystem_numa(self, bdev_name):
1645        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1646        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1647
1648        # Remove two last characters to get controller name instead of subsystem name
1649        nvme_ctrl = bdev_name[:-2]
1650        for bdev in bdev_conf_json_obj:
1651            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1652                return self.get_route_nic_numa(bdev["params"]["traddr"])
1653        return None
1654
1655
1656if __name__ == "__main__":
1657    exit_code = 0
1658
1659    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1660    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1661
1662    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1663    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1664                        help='Configuration file.')
1665    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1666                        help='Results directory.')
1667    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1668                        help='CSV results filename.')
1669    parser.add_argument('-f', '--force', default=False, action='store_true',
1670                        dest='force', help="""Force script to continue and try to use all
1671                        available NVMe devices during test.
1672                        WARNING: Might result in data loss on used NVMe drives""")
1673
1674    args = parser.parse_args()
1675
1676    logging.basicConfig(level=logging.INFO,
1677                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1678
1679    logging.info("Using config file: %s" % args.config)
1680    with open(args.config, "r") as config:
1681        data = json.load(config)
1682
1683    initiators = []
1684    fio_cases = []
1685
1686    general_config = data["general"]
1687    target_config = data["target"]
1688    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1689
1690    if "null_block_devices" not in data["target"] and \
1691        (args.force is False and
1692            "allowlist" not in data["target"] and
1693            "blocklist" not in data["target"]):
1694        # TODO: Also check if allowlist or blocklist are not empty.
1695        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1696        You can choose to use all available NVMe drives on your system, which may potentially
1697        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1698        exit(1)
1699
1700    for k, v in data.items():
1701        if "target" in k:
1702            v.update({"results_dir": args.results})
1703            if data[k]["mode"] == "spdk":
1704                target_obj = SPDKTarget(k, data["general"], v)
1705            elif data[k]["mode"] == "kernel":
1706                target_obj = KernelTarget(k, data["general"], v)
1707        elif "initiator" in k:
1708            if data[k]["mode"] == "spdk":
1709                init_obj = SPDKInitiator(k, data["general"], v)
1710            elif data[k]["mode"] == "kernel":
1711                init_obj = KernelInitiator(k, data["general"], v)
1712            initiators.append(init_obj)
1713        elif "fio" in k:
1714            fio_workloads = itertools.product(data[k]["bs"],
1715                                              data[k]["qd"],
1716                                              data[k]["rw"])
1717
1718            fio_run_time = data[k]["run_time"]
1719            fio_ramp_time = data[k]["ramp_time"]
1720            fio_rw_mix_read = data[k]["rwmixread"]
1721            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1722            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1723
1724            fio_rate_iops = 0
1725            if "rate_iops" in data[k]:
1726                fio_rate_iops = data[k]["rate_iops"]
1727
1728            fio_offset = False
1729            if "offset" in data[k]:
1730                fio_offset = data[k]["offset"]
1731            fio_offset_inc = 0
1732            if "offset_inc" in data[k]:
1733                fio_offset_inc = data[k]["offset_inc"]
1734        else:
1735            continue
1736
1737    try:
1738        os.mkdir(args.results)
1739    except FileExistsError:
1740        pass
1741
1742    for i in initiators:
1743        target_obj.initiator_info.append(
1744            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1745        )
1746
1747    # TODO: This try block is definietly too large. Need to break this up into separate
1748    # logical blocks to reduce size.
1749    try:
1750        target_obj.tgt_start()
1751
1752        for i in initiators:
1753            i.match_subsystems(target_obj.subsystem_info_list)
1754            if i.enable_adq:
1755                i.adq_configure_tc()
1756
1757        # Poor mans threading
1758        # Run FIO tests
1759        for block_size, io_depth, rw in fio_workloads:
1760            configs = []
1761            for i in initiators:
1762                i.init_connect()
1763                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1764                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1765                                       fio_offset, fio_offset_inc)
1766                configs.append(cfg)
1767
1768            for run_no in range(1, fio_run_num+1):
1769                threads = []
1770                power_daemon = None
1771                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1772
1773                for i, cfg in zip(initiators, configs):
1774                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1775                    threads.append(t)
1776                if target_obj.enable_sar:
1777                    sar_file_prefix = measurements_prefix + "_sar"
1778                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1779                    threads.append(t)
1780
1781                if target_obj.enable_pcm:
1782                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1783                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1784                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1785                    threads.append(pcm_cpu_t)
1786
1787                if target_obj.enable_bw:
1788                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1789                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1790                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1791                    threads.append(t)
1792
1793                if target_obj.enable_dpdk_memory:
1794                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1795                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1796                    threads.append(t)
1797
1798                if target_obj.enable_pm:
1799                    power_daemon = threading.Thread(target=target_obj.measure_power,
1800                                                    args=(args.results, measurements_prefix, script_full_dir,
1801                                                          fio_ramp_time, fio_run_time))
1802                    threads.append(power_daemon)
1803
1804                for t in threads:
1805                    t.start()
1806                for t in threads:
1807                    t.join()
1808
1809            for i in initiators:
1810                i.init_disconnect()
1811                i.copy_result_files(args.results)
1812        try:
1813            parse_results(args.results, args.csv_filename)
1814        except Exception as err:
1815            exit_code = 1
1816            logging.error("There was an error with parsing the results")
1817            logging.error(err)
1818    finally:
1819        for i in initiators:
1820            try:
1821                i.stop()
1822            except Exception as err:
1823                pass
1824        target_obj.stop()
1825        sys.exit(exit_code)
1826