xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 95d6c9fac17572b107042103439aafd696d60b0e)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import inspect
16import logging
17import zipfile
18import threading
19import subprocess
20import itertools
21import configparser
22import time
23import uuid
24
25import paramiko
26import pandas as pd
27from common import *
28from subprocess import CalledProcessError
29from abc import abstractmethod, ABC
30from dataclasses import dataclass
31from typing import Any
32
33sys.path.append(os.path.dirname(__file__) + '/../../../python')
34
35import spdk.rpc as rpc  # noqa
36import spdk.rpc.client as rpc_client  # noqa
37
38
39@dataclass
40class ConfigField:
41    name: str = None
42    default: Any = None
43    required: bool = False
44
45
46class Server(ABC):
47
48    RDMA_PROTOCOL_IWARP = 0
49    RDMA_PROTOCOL_ROCE = 1
50    RDMA_PROTOCOL_UNKNOWN = -1
51
52    def __init__(self, name, general_config, server_config):
53        self.name = name
54
55        self.log = logging.getLogger(self.name)
56
57        config_fields = [
58            ConfigField(name='username', required=True),
59            ConfigField(name='password', required=True),
60            ConfigField(name='transport', required=True),
61            ConfigField(name='skip_spdk_install', default=False),
62            ConfigField(name='irdma_roce_enable', default=False),
63            ConfigField(name='pause_frames', default=None)
64        ]
65
66        self.read_config(config_fields, general_config)
67        self.transport = self.transport.lower()
68
69        config_fields = [
70            ConfigField(name='nic_ips', required=True),
71            ConfigField(name='mode', required=True),
72            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
73            ConfigField(name='enable_arfs', default=False),
74            ConfigField(name='tuned_profile', default='')
75        ]
76        self.read_config(config_fields, server_config)
77
78        self.local_nic_info = []
79        self._nics_json_obj = {}
80        self.svc_restore_dict = {}
81        self.sysctl_restore_dict = {}
82        self.tuned_restore_dict = {}
83        self.governor_restore = ""
84
85        self.enable_adq = server_config.get('adq_enable', False)
86        self.adq_priority = 1 if self.enable_adq else None
87        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
88
89        if not re.match(r'^[A-Za-z0-9\-]+$', name):
90            self.log.info("Please use a name which contains only letters, numbers or dashes")
91            sys.exit(1)
92
93    def read_config(self, config_fields, config):
94        for config_field in config_fields:
95            value = config.get(config_field.name, config_field.default)
96            if config_field.required and value is None:
97                raise ValueError('%s config key is required' % config_field.name)
98
99            setattr(self, config_field.name, value)
100
101    @staticmethod
102    def get_uncommented_lines(lines):
103        return [line for line in lines if line and not line.startswith('#')]
104
105    @staticmethod
106    def get_core_list_from_mask(core_mask):
107        # Generate list of individual cores from hex core mask
108        # (e.g. '0xffff') or list containing:
109        # - individual cores (e.g. '1, 2, 3')
110        # - core ranges (e.g. '0-3')
111        # - mix of both (e.g. '0, 1-4, 9, 11-13')
112
113        core_list = []
114        if "0x" in core_mask:
115            core_mask_int = int(core_mask, 16)
116            for i in range(core_mask_int.bit_length()):
117                if (1 << i) & core_mask_int:
118                    core_list.append(i)
119            return core_list
120        else:
121            # Core list can be provided in .json config with square brackets
122            # remove them first
123            core_mask = core_mask.replace("[", "")
124            core_mask = core_mask.replace("]", "")
125
126            for i in core_mask.split(","):
127                if "-" in i:
128                    start, end = i.split("-")
129                    core_range = range(int(start), int(end) + 1)
130                    core_list.extend(core_range)
131                else:
132                    core_list.append(int(i))
133            return core_list
134
135    def get_nic_name_by_ip(self, ip):
136        if not self._nics_json_obj:
137            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
138            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
139        for nic in self._nics_json_obj:
140            for addr in nic["addr_info"]:
141                if ip in addr["local"]:
142                    return nic["ifname"]
143
144    @abstractmethod
145    def set_local_nic_info_helper(self):
146        pass
147
148    def set_local_nic_info(self, pci_info):
149        def extract_network_elements(json_obj):
150            nic_list = []
151            if isinstance(json_obj, list):
152                for x in json_obj:
153                    nic_list.extend(extract_network_elements(x))
154            elif isinstance(json_obj, dict):
155                if "children" in json_obj:
156                    nic_list.extend(extract_network_elements(json_obj["children"]))
157                if "class" in json_obj.keys() and "network" in json_obj["class"]:
158                    nic_list.append(json_obj)
159            return nic_list
160
161        self.local_nic_info = extract_network_elements(pci_info)
162
163    def get_nic_numa_node(self, nic_name):
164        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
165
166    def get_numa_cpu_map(self):
167        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
168        numa_cpu_json_map = {}
169
170        for cpu in numa_cpu_json_obj["cpus"]:
171            cpu_num = int(cpu["cpu"])
172            numa_node = int(cpu["node"])
173            numa_cpu_json_map.setdefault(numa_node, [])
174            numa_cpu_json_map[numa_node].append(cpu_num)
175
176        return numa_cpu_json_map
177
178    # pylint: disable=R0201
179    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
180        return ""
181
182    def configure_system(self):
183        self.load_drivers()
184        self.configure_services()
185        self.configure_sysctl()
186        self.configure_arfs()
187        self.configure_tuned()
188        self.configure_cpu_governor()
189        self.configure_irq_affinity(**self.irq_settings)
190        self.configure_pause_frames()
191
192    def check_rdma_protocol(self):
193        try:
194            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"]).strip()
195            return self.RDMA_PROTOCOL_IWARP if roce_ena == "0" else self.RDMA_PROTOCOL_ROCE
196        except CalledProcessError as e:
197            self.log.error("ERROR: failed to check RDMA protocol!")
198            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
199            return self.RDMA_PROTOCOL_UNKNOWN
200
201    def load_drivers(self):
202        self.log.info("Loading drivers")
203        self.exec_cmd(["sudo", "modprobe", "-a", f"nvme-{self.transport}", f"nvmet-{self.transport}"])
204
205        if self.transport != "rdma":
206            return
207
208        current_mode = self.check_rdma_protocol()
209        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
210            self.log.error("ERROR: failed to check RDMA protocol mode")
211            return
212
213        if self.irdma_roce_enable:
214            if current_mode == self.RDMA_PROTOCOL_IWARP:
215                self.reload_driver("irdma", "roce_ena=1")
216                self.log.info("Loaded irdma driver with RoCE enabled")
217            else:
218                self.log.info("Leaving irdma driver with RoCE enabled")
219        else:
220            self.reload_driver("irdma", "roce_ena=0")
221            self.log.info("Loaded irdma driver with iWARP enabled")
222
223    def set_pause_frames(self, rx_state, tx_state):
224        for nic_ip in self.nic_ips:
225            nic_name = self.get_nic_name_by_ip(nic_ip)
226            self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state])
227
228    def configure_pause_frames(self):
229        if self.pause_frames is None:
230            self.log.info("Keeping NIC's default pause frames setting")
231            return
232        elif not self.pause_frames:
233            self.log.info("Turning off pause frames")
234            self.set_pause_frames("off", "off")
235        else:
236            self.log.info("Turning on pause frames")
237            self.set_pause_frames("on", "on")
238
239    def configure_arfs(self):
240        rps_flow_cnt = 512 if self.enable_arfs else 0
241
242        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
243        for nic_name in nic_names:
244            rps_wildcard_path = f"/sys/class/net/{nic_name}/queues/rx-*/rps_flow_cnt"
245            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
246            self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name} rx queues")
247
248            self.exec_cmd(["bash", "-c", f"echo {rps_flow_cnt} | sudo tee {rps_wildcard_path}"])
249
250            set_values = list(set(self.exec_cmd(["sudo", "bash", "-c", f'cat {rps_wildcard_path}']).strip().split("\n")))
251            if len(set_values) > 1:
252                self.log.error(f"""Not all NIC {nic_name} queues had rps_flow_cnt set properly. Found rps_flow_cnt values: {set_values}""")
253            elif set_values[0] != str(rps_flow_cnt):
254                self.log.error(f"""Wrong rps_flow_cnt set on {nic_name} queues. Found {set_values[0]}, should be {rps_flow_cnt}""")
255            else:
256                self.log.info(f"Configuration of {nic_name} completed with confirmed rps_flow_cnt settings.")
257
258        self.log.info("ARFS configuration completed.")
259
260    def configure_adq(self):
261        self.adq_load_modules()
262        self.adq_configure_nic()
263
264    def adq_load_modules(self):
265        self.log.info("Modprobing ADQ-related Linux modules...")
266        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
267        for module in adq_module_deps:
268            try:
269                self.exec_cmd(["sudo", "modprobe", module])
270                self.log.info("%s loaded!" % module)
271            except CalledProcessError as e:
272                self.log.error("ERROR: failed to load module %s" % module)
273                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
274
275    def adq_configure_tc(self):
276        self.log.info("Configuring ADQ Traffic classes and filters...")
277
278        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
279        num_queues_tc1 = self.num_cores
280        port_param = "dst_port" if isinstance(self, Target) else "src_port"
281        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
282
283        for nic_ip in self.nic_ips:
284            nic_name = self.get_nic_name_by_ip(nic_ip)
285            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
286
287            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
288                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
289                                "queues", "%s@0" % num_queues_tc0,
290                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
291                                "hw", "1", "mode", "channel"]
292            self.log.info(" ".join(tc_qdisc_map_cmd))
293            self.exec_cmd(tc_qdisc_map_cmd)
294
295            time.sleep(5)
296            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
297            self.log.info(" ".join(tc_qdisc_ingress_cmd))
298            self.exec_cmd(tc_qdisc_ingress_cmd)
299
300            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
301            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
302                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
303
304            for port in nic_ports:
305                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
306                                 "protocol", "ip", "ingress", "prio", "1", "flower",
307                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
308                                 "skip_sw", "hw_tc", "1"]
309                self.log.info(" ".join(tc_filter_cmd))
310                self.exec_cmd(tc_filter_cmd)
311
312            # show tc configuration
313            self.log.info("Show tc configuration for %s NIC..." % nic_name)
314            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
315            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
316            self.log.info("%s" % tc_disk_out)
317            self.log.info("%s" % tc_filter_out)
318
319            # Ethtool coalesce settings must be applied after configuring traffic classes
320            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
321            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
322
323            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
324            xps_cmd = ["sudo", xps_script_path, nic_name]
325            self.log.info(xps_cmd)
326            self.exec_cmd(xps_cmd)
327
328    def reload_driver(self, driver, *modprobe_args):
329
330        try:
331            self.exec_cmd(["sudo", "rmmod", driver])
332            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
333        except CalledProcessError as e:
334            self.log.error("ERROR: failed to reload %s module!" % driver)
335            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
336
337    def adq_configure_nic(self):
338        self.log.info("Configuring NIC port settings for ADQ testing...")
339
340        # Reload the driver first, to make sure any previous settings are re-set.
341        self.reload_driver("ice")
342
343        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
344        for nic in nic_names:
345            self.log.info(nic)
346            try:
347                self.exec_cmd(["sudo", "ethtool", "-K", nic,
348                               "hw-tc-offload", "on"])  # Enable hardware TC offload
349                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
350                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
351                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
352                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
353                # but can be used with 4k block sizes as well
354                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
355                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
356            except subprocess.CalledProcessError as e:
357                self.log.error("ERROR: failed to configure NIC port using ethtool!")
358                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
359                self.log.info("Please update your NIC driver and firmware versions and try again.")
360            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
361            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
362
363    def configure_services(self):
364        self.log.info("Configuring active services...")
365        svc_config = configparser.ConfigParser(strict=False)
366
367        # Below list is valid only for RHEL / Fedora systems and might not
368        # contain valid names for other distributions.
369        svc_target_state = {
370            "firewalld": "inactive",
371            "irqbalance": "inactive",
372            "lldpad.service": "inactive",
373            "lldpad.socket": "inactive"
374        }
375
376        for service in svc_target_state:
377            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
378            out = "\n".join(["[%s]" % service, out])
379            svc_config.read_string(out)
380
381            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
382                continue
383
384            service_state = svc_config[service]["ActiveState"]
385            self.log.info("Current state of %s service is %s" % (service, service_state))
386            self.svc_restore_dict.update({service: service_state})
387            if service_state != "inactive":
388                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
389                self.exec_cmd(["sudo", "systemctl", "stop", service])
390
391    def configure_sysctl(self):
392        self.log.info("Tuning sysctl settings...")
393
394        busy_read = 0
395        busy_poll = 0
396        if self.enable_adq and self.mode == "spdk":
397            busy_read = 1
398            busy_poll = 1
399
400        sysctl_opts = {
401            "net.core.busy_poll": busy_poll,
402            "net.core.busy_read": busy_read,
403            "net.core.somaxconn": 4096,
404            "net.core.netdev_max_backlog": 8192,
405            "net.ipv4.tcp_max_syn_backlog": 16384,
406            "net.core.rmem_max": 268435456,
407            "net.core.wmem_max": 268435456,
408            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
409            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
410            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
411            "net.ipv4.route.flush": 1,
412            "vm.overcommit_memory": 1,
413            "net.core.rps_sock_flow_entries": 0
414        }
415
416        if self.enable_adq:
417            sysctl_opts.update(self.adq_set_busy_read(1))
418
419        if self.enable_arfs:
420            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
421
422        for opt, value in sysctl_opts.items():
423            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
424            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
425
426    def configure_tuned(self):
427        if not self.tuned_profile:
428            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
429            return
430
431        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
432        service = "tuned"
433        tuned_config = configparser.ConfigParser(strict=False)
434
435        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
436        out = "\n".join(["[%s]" % service, out])
437        tuned_config.read_string(out)
438        tuned_state = tuned_config[service]["ActiveState"]
439        self.svc_restore_dict.update({service: tuned_state})
440
441        if tuned_state != "inactive":
442            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
443            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
444
445            self.tuned_restore_dict = {
446                "profile": profile,
447                "mode": profile_mode
448            }
449
450        self.exec_cmd(["sudo", "systemctl", "start", service])
451        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
452        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
453
454    def configure_cpu_governor(self):
455        self.log.info("Setting CPU governor to performance...")
456
457        # This assumes that there is the same CPU scaling governor on each CPU
458        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
459        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
460
461    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
462        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
463
464        if mode not in ["default", "bynode", "cpulist"]:
465            raise ValueError("%s irq affinity setting not supported" % mode)
466
467        if mode == "cpulist" and not cpulist:
468            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
469
470        affinity_script = "set_irq_affinity.sh"
471        if "default" not in mode:
472            affinity_script = "set_irq_affinity_cpulist.sh"
473            system_cpu_map = self.get_numa_cpu_map()
474        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
475
476        def cpu_list_to_string(cpulist):
477            return ",".join(map(lambda x: str(x), cpulist))
478
479        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
480        for nic_name in nic_names:
481            irq_cmd = ["sudo", irq_script_path]
482
483            # Use only CPU cores matching NIC NUMA node.
484            # Remove any CPU cores if they're on exclusion list.
485            if mode == "bynode":
486                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
487                if cpulist and exclude_cpulist:
488                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
489                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
490                    if not irq_cpus:
491                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
492                irq_cmd.append(cpu_list_to_string(irq_cpus))
493
494            if mode == "cpulist":
495                irq_cpus = self.get_core_list_from_mask(cpulist)
496                if exclude_cpulist:
497                    # Flatten system CPU list, we don't need NUMA awareness here
498                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
499                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
500                    if not irq_cpus:
501                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
502                irq_cmd.append(cpu_list_to_string(irq_cpus))
503
504            irq_cmd.append(nic_name)
505            self.log.info(irq_cmd)
506            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
507
508    def restore_services(self):
509        self.log.info("Restoring services...")
510        for service, state in self.svc_restore_dict.items():
511            cmd = "stop" if state == "inactive" else "start"
512            self.exec_cmd(["sudo", "systemctl", cmd, service])
513
514    def restore_sysctl(self):
515        self.log.info("Restoring sysctl settings...")
516        for opt, value in self.sysctl_restore_dict.items():
517            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
518
519    def restore_tuned(self):
520        self.log.info("Restoring tuned-adm settings...")
521
522        if not self.tuned_restore_dict:
523            return
524
525        if self.tuned_restore_dict["mode"] == "auto":
526            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
527            self.log.info("Reverted tuned-adm to auto_profile.")
528        else:
529            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
530            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
531
532    def restore_governor(self):
533        self.log.info("Restoring CPU governor setting...")
534        if self.governor_restore:
535            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
536            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
537
538    def restore_settings(self):
539        self.restore_governor()
540        self.restore_tuned()
541        self.restore_services()
542        self.restore_sysctl()
543        if self.enable_adq:
544            self.reload_driver("ice")
545
546    def check_for_throttling(self):
547        patterns = [
548            r"CPU\d+: Core temperature is above threshold, cpu clock is throttled",
549            r"mlx5_core \S+: poll_health:\d+:\(pid \d+\): device's health compromised",
550            r"mlx5_core \S+: print_health_info:\d+:\(pid \d+\): Health issue observed, High temperature",
551            r"mlx5_core \S+: temp_warn:\d+:\(pid \d+\): High temperature on sensors"
552        ]
553
554        issue_found = False
555        try:
556            output = self.exec_cmd(["dmesg"])
557
558            for line in output.split("\n"):
559                for pattern in patterns:
560                    if re.search(pattern, line):
561                        self.log.warning("Throttling or temperature issue found")
562                        issue_found = True
563            return 1 if issue_found else 0
564        except Exception as e:
565            self.log.error("ERROR: failed to execute dmesg command")
566            self.log.error(f"Error {e}")
567            return 1
568
569    def stop(self):
570        if self.check_for_throttling():
571            err_message = (f"Throttling or temperature issue found on {self.name} system! "
572                           "Check system logs!")
573            raise CpuThrottlingError(err_message)
574
575
576class Target(Server):
577    def __init__(self, name, general_config, target_config):
578        super().__init__(name, general_config, target_config)
579
580        # Defaults
581        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
582        self.subsystem_info_list = []
583        self.initiator_info = []
584
585        config_fields = [
586            ConfigField(name='mode', required=True),
587            ConfigField(name='results_dir', required=True),
588            ConfigField(name='enable_pm', default=True),
589            ConfigField(name='enable_sar', default=True),
590            ConfigField(name='enable_pcm', default=True),
591            ConfigField(name='enable_dpdk_memory', default=True)
592        ]
593        self.read_config(config_fields, target_config)
594
595        self.null_block = target_config.get('null_block_devices', 0)
596        self.scheduler_name = target_config.get('scheduler_settings', 'static')
597        self.enable_zcopy = target_config.get('zcopy_settings', False)
598        self.enable_bw = target_config.get('enable_bandwidth', True)
599        self.nvme_blocklist = target_config.get('blocklist', [])
600        self.nvme_allowlist = target_config.get('allowlist', [])
601
602        # Blocklist takes precedence, remove common elements from allowlist
603        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
604
605        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
606        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
607
608        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
609        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
610        self.set_local_nic_info(self.set_local_nic_info_helper())
611
612        if not self.skip_spdk_install:
613            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
614
615        self.configure_system()
616        if self.enable_adq:
617            self.configure_adq()
618            self.configure_irq_affinity()
619        self.sys_config()
620
621    def set_local_nic_info_helper(self):
622        return json.loads(self.exec_cmd(["lshw", "-json"]))
623
624    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
625        stderr_opt = None
626        if stderr_redirect:
627            stderr_opt = subprocess.STDOUT
628        if change_dir:
629            old_cwd = os.getcwd()
630            os.chdir(change_dir)
631            self.log.info("Changing directory to %s" % change_dir)
632
633        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
634
635        if change_dir:
636            os.chdir(old_cwd)
637            self.log.info("Changing directory to %s" % old_cwd)
638        return out
639
640    def zip_spdk_sources(self, spdk_dir, dest_file):
641        self.log.info("Zipping SPDK source directory")
642        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
643        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
644            for file in files:
645                fh.write(os.path.relpath(os.path.join(root, file)))
646        fh.close()
647        self.log.info("Done zipping")
648
649    @staticmethod
650    def _chunks(input_list, chunks_no):
651        div, rem = divmod(len(input_list), chunks_no)
652        for i in range(chunks_no):
653            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
654            yield input_list[si:si + (div + 1 if i < rem else div)]
655
656    def spread_bdevs(self, req_disks):
657        # Spread available block devices indexes:
658        # - evenly across available initiator systems
659        # - evenly across available NIC interfaces for
660        #   each initiator
661        # Not NUMA aware.
662        ip_bdev_map = []
663        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
664
665        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
666            self.initiator_info[i]["bdev_range"] = init_chunk
667            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
668            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
669                for c in nic_chunk:
670                    ip_bdev_map.append((ip, c))
671        return ip_bdev_map
672
673    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
674        cpu_number = os.cpu_count()
675        sar_idle_sum = 0
676        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
677        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
678
679        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
680        time.sleep(ramp_time)
681        self.log.info("Starting SAR measurements")
682
683        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
684        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
685            for line in out.split("\n"):
686                if "Average" in line:
687                    if "CPU" in line:
688                        self.log.info("Summary CPU utilization from SAR:")
689                        self.log.info(line)
690                    elif "all" in line:
691                        self.log.info(line)
692                    else:
693                        sar_idle_sum += float(line.split()[7])
694            fh.write(out)
695        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
696
697        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
698            f.write("%0.2f" % sar_cpu_usage)
699
700    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
701        time.sleep(ramp_time)
702        self.log.info("Starting power measurements")
703        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
704                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
705                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
706
707    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
708        time.sleep(ramp_time)
709        cmd = ["pcm", "1", "-i=%s" % run_time,
710               "-csv=%s/%s" % (results_dir, pcm_file_name)]
711        out = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
712        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
713        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
714        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
715        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
716        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
717
718        if out.returncode:
719            self.log.warning("PCM Power measurement finished with a non-zero return code.")
720            self.log.warning(out.stdout.decode(encoding="utf-8"))
721
722    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
723        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
724        time.sleep(ramp_time)
725        self.log.info("INFO: starting network bandwidth measure")
726        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
727                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
728        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
729        #       Improve this so that additional file containing measurements average is generated too.
730        # TODO: Monitor only these interfaces which are currently used to run the workload.
731
732    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
733        self.log.info("INFO: waiting to generate DPDK memory usage")
734        time.sleep(ramp_time)
735        self.log.info("INFO: generating DPDK memory usage")
736        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
737        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
738
739    def sys_config(self):
740        self.log.info("====Kernel release:====")
741        self.log.info(os.uname().release)
742        self.log.info("====Kernel command line:====")
743        with open('/proc/cmdline') as f:
744            cmdline = f.readlines()
745            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
746        self.log.info("====sysctl conf:====")
747        with open('/etc/sysctl.conf') as f:
748            sysctl = f.readlines()
749            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
750        self.log.info("====Cpu power info:====")
751        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
752        self.log.info("====zcopy settings:====")
753        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
754        self.log.info("====Scheduler settings:====")
755        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
756
757
758class Initiator(Server):
759    def __init__(self, name, general_config, initiator_config):
760        super().__init__(name, general_config, initiator_config)
761
762        # required fields and defaults
763        config_fields = [
764            ConfigField(name='mode', required=True),
765            ConfigField(name='ip', required=True),
766            ConfigField(name='target_nic_ips', required=True),
767            ConfigField(name='cpus_allowed', default=None),
768            ConfigField(name='cpus_allowed_policy', default='shared'),
769            ConfigField(name='spdk_dir', default='/tmp/spdk'),
770            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
771            ConfigField(name='nvmecli_bin', default='nvme'),
772            ConfigField(name='cpu_frequency', default=None),
773            ConfigField(name='allow_cpu_sharing', default=True)
774        ]
775
776        self.read_config(config_fields, initiator_config)
777
778        if os.getenv('SPDK_WORKSPACE'):
779            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
780
781        self.subsystem_info_list = []
782
783        self.ssh_connection = paramiko.SSHClient()
784        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
785        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
786        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
787        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
788        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
789
790        if not self.skip_spdk_install:
791            self.copy_spdk("/tmp/spdk.zip")
792
793        self.set_local_nic_info(self.set_local_nic_info_helper())
794        self.set_cpu_frequency()
795        self.configure_system()
796        if self.enable_adq:
797            self.configure_adq()
798        self.sys_config()
799
800    def set_local_nic_info_helper(self):
801        return json.loads(self.exec_cmd(["lshw", "-json"]))
802
803    def stop(self):
804        self.restore_settings()
805        try:
806            super().stop()
807        except CpuThrottlingError as err:
808            raise err
809        finally:
810            self.ssh_connection.close()
811
812    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
813        if change_dir:
814            cmd = ["cd", change_dir, ";", *cmd]
815
816        # In case one of the command elements contains whitespace and is not
817        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
818        # when sending to remote system.
819        for i, c in enumerate(cmd):
820            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
821                cmd[i] = '"%s"' % c
822        cmd = " ".join(cmd)
823
824        # Redirect stderr to stdout thanks using get_pty option if needed
825        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
826        out = stdout.read().decode(encoding="utf-8")
827
828        # Check the return code
829        rc = stdout.channel.recv_exit_status()
830        if rc:
831            raise CalledProcessError(int(rc), cmd, out)
832
833        return out
834
835    def put_file(self, local, remote_dest):
836        ftp = self.ssh_connection.open_sftp()
837        ftp.put(local, remote_dest)
838        ftp.close()
839
840    def get_file(self, remote, local_dest):
841        ftp = self.ssh_connection.open_sftp()
842        ftp.get(remote, local_dest)
843        ftp.close()
844
845    def copy_spdk(self, local_spdk_zip):
846        self.log.info("Copying SPDK sources to initiator %s" % self.name)
847        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
848        self.log.info("Copied sources zip from target")
849        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
850        self.log.info("Sources unpacked")
851
852    def copy_result_files(self, dest_dir):
853        self.log.info("Copying results")
854
855        if not os.path.exists(dest_dir):
856            os.mkdir(dest_dir)
857
858        # Get list of result files from initiator and copy them back to target
859        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
860
861        for file in file_list:
862            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
863                          os.path.join(dest_dir, file))
864        self.log.info("Done copying results")
865
866    def match_subsystems(self, target_subsystems):
867        subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips]
868        if not subsystems:
869            raise Exception("No matching subsystems found on target side!")
870        subsystems.sort(key=lambda x: x[1])
871        self.log.info("Found matching subsystems on target side:")
872        for s in subsystems:
873            self.log.info(s)
874        self.subsystem_info_list = subsystems
875
876    @abstractmethod
877    def init_connect(self):
878        pass
879
880    @abstractmethod
881    def init_disconnect(self):
882        pass
883
884    @abstractmethod
885    def gen_fio_filename_conf(self, *args, **kwargs):
886        # Logic implemented in SPDKInitiator and KernelInitiator classes
887        pass
888
889    def get_route_nic_numa(self, remote_nvme_ip):
890        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
891        local_nvme_nic = local_nvme_nic[0]["dev"]
892        return self.get_nic_numa_node(local_nvme_nic)
893
894    @staticmethod
895    def gen_fio_offset_section(offset_inc, num_jobs):
896        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
897        return "\n".join(["size=%s%%" % offset_inc,
898                          "offset=0%",
899                          "offset_increment=%s%%" % offset_inc])
900
901    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
902        numa_stats = {}
903        allowed_cpus = []
904        for nvme in fio_filenames_list:
905            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
906            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
907
908        # Use the most common NUMA node for this chunk to allocate memory and CPUs
909        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
910
911        # Check if we have enough free CPUs to pop from the list before assigning them
912        if len(self.available_cpus[section_local_numa]) < num_jobs:
913            if self.allow_cpu_sharing:
914                self.log.info("Regenerating available CPU list %s" % section_local_numa)
915                # Remove still available CPUs from the regenerated list. We don't want to
916                # regenerate it with duplicates.
917                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
918                self.available_cpus[section_local_numa].extend(cpus_regen)
919                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
920            else:
921                self.log.error("No more free CPU cores to use from allowed_cpus list!")
922                raise IndexError
923
924        for _ in range(num_jobs):
925            try:
926                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
927            except IndexError:
928                self.log.error("No more free CPU cores to use from allowed_cpus list!")
929                raise
930
931        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
932                          "numa_mem_policy=prefer:%s" % section_local_numa])
933
934    def gen_fio_config(self, rw, block_size, io_depth, subsys_no, fio_settings):
935        fio_conf_template = inspect.cleandoc("""
936            [global]
937            ioengine={ioengine}
938            {spdk_conf}
939            thread=1
940            group_reporting=1
941            direct=1
942            percentile_list=50:90:99:99.5:99.9:99.99:99.999
943
944            norandommap=1
945            rw={rw}
946            rwmixread={rwmixread}
947            bs={block_size}
948            time_based=1
949            ramp_time={ramp_time}
950            runtime={run_time}
951            rate_iops={rate_iops}
952        """)
953
954        if self.cpus_allowed is not None:
955            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
956            self.num_cores = len(self.get_core_list_from_mask(self.cpus_allowed))
957            threads = range(0, self.num_cores)
958        elif hasattr(self, 'num_cores'):
959            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
960            threads = range(0, int(self.num_cores))
961        else:
962            self.num_cores = len(self.subsystem_info_list)
963            threads = range(0, len(self.subsystem_info_list))
964
965        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, fio_settings["num_jobs"],
966                                                      fio_settings["offset"], fio_settings["offset_inc"], fio_settings["numa_align"])
967
968        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
969                                              rw=rw, rwmixread=fio_settings["rw_mix_read"], block_size=block_size,
970                                              ramp_time=fio_settings["ramp_time"], run_time=fio_settings["run_time"],
971                                              rate_iops=fio_settings["rate_iops"])
972
973        # TODO: hipri disabled for now, as it causes fio errors:
974        # io_u error on file /dev/nvme2n1: Operation not supported
975        # See comment in KernelInitiator class, init_connect() function
976        if "io_uring" in self.ioengine:
977            fio_config = "\n".join([fio_config, "fixedbufs=1", "registerfiles=1", "#hipri=1"])
978        if fio_settings["num_jobs"]:
979            fio_config = "\n".join([fio_config, f"numjobs={fio_settings['num_jobs']}"])
980        if self.cpus_allowed is not None:
981            fio_config = "\n".join([fio_config, f"cpus_allowed={self.cpus_allowed}", f"cpus_allowed_policy={self.cpus_allowed_policy}"])
982        fio_config = "\n".join([fio_config, filename_section])
983
984        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, fio_settings["rw_mix_read"])
985        if hasattr(self, "num_cores"):
986            fio_config_filename += "_%sCPU" % self.num_cores
987        fio_config_filename += ".fio"
988
989        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
990        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
991        self.log.info("Created FIO Config:")
992        self.log.info(fio_config)
993
994        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
995
996    def set_cpu_frequency(self):
997        if self.cpu_frequency is not None:
998            try:
999                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
1000                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
1001                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
1002            except Exception:
1003                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
1004                sys.exit(1)
1005        else:
1006            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
1007
1008    def run_fio(self, fio_config_file, run_num=1):
1009        job_name, _ = os.path.splitext(fio_config_file)
1010        self.log.info("Starting FIO run for job: %s" % job_name)
1011        self.log.info("Using FIO: %s" % self.fio_bin)
1012
1013        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
1014        try:
1015            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
1016                                    "--output=%s" % output_filename, "--eta=never"], True)
1017            self.log.info(output)
1018            self.log.info("FIO run finished. Results in: %s" % output_filename)
1019        except subprocess.CalledProcessError as e:
1020            self.log.error("ERROR: Fio process failed!")
1021            self.log.error(e.stdout)
1022
1023    def sys_config(self):
1024        self.log.info("====Kernel release:====")
1025        self.log.info(self.exec_cmd(["uname", "-r"]))
1026        self.log.info("====Kernel command line:====")
1027        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1028        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1029        self.log.info("====sysctl conf:====")
1030        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
1031        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1032        self.log.info("====Cpu power info:====")
1033        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
1034
1035
1036class KernelTarget(Target):
1037    def __init__(self, name, general_config, target_config):
1038        super().__init__(name, general_config, target_config)
1039        # Defaults
1040        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1041
1042    def load_drivers(self):
1043        self.log.info("Loading drivers")
1044        super().load_drivers()
1045        if self.null_block:
1046            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1047
1048    def configure_adq(self):
1049        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1050
1051    def adq_configure_tc(self):
1052        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1053
1054    def adq_set_busy_read(self, busy_read_val):
1055        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1056        return {"net.core.busy_read": 0}
1057
1058    def stop(self):
1059        self.nvmet_command(self.nvmet_bin, "clear")
1060        self.restore_settings()
1061        super().stop()
1062
1063    def get_nvme_device_bdf(self, nvme_dev_path):
1064        nvme_name = os.path.basename(nvme_dev_path)
1065        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1066
1067    def get_nvme_devices(self):
1068        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1069        nvme_list = []
1070        for dev in dev_list:
1071            if "nvme" not in dev:
1072                continue
1073            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1074                continue
1075            if len(self.nvme_allowlist) == 0:
1076                nvme_list.append(dev)
1077                continue
1078            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1079                nvme_list.append(dev)
1080        return nvme_list
1081
1082    def nvmet_command(self, nvmet_bin, command):
1083        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1084
1085    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1086
1087        nvmet_cfg = {
1088            "ports": [],
1089            "hosts": [],
1090            "subsystems": [],
1091        }
1092
1093        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1094            port = str(4420 + bdev_num)
1095            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1096            serial = "SPDK00%s" % bdev_num
1097            bdev_name = nvme_list[bdev_num]
1098
1099            nvmet_cfg["subsystems"].append({
1100                "allowed_hosts": [],
1101                "attr": {
1102                    "allow_any_host": "1",
1103                    "serial": serial,
1104                    "version": "1.3"
1105                },
1106                "namespaces": [
1107                    {
1108                        "device": {
1109                            "path": bdev_name,
1110                            "uuid": "%s" % uuid.uuid4()
1111                        },
1112                        "enable": 1,
1113                        "nsid": port
1114                    }
1115                ],
1116                "nqn": nqn
1117            })
1118
1119            nvmet_cfg["ports"].append({
1120                "addr": {
1121                    "adrfam": "ipv4",
1122                    "traddr": ip,
1123                    "trsvcid": port,
1124                    "trtype": self.transport
1125                },
1126                "portid": bdev_num,
1127                "referrals": [],
1128                "subsystems": [nqn]
1129            })
1130
1131            self.subsystem_info_list.append((port, nqn, ip))
1132        self.subsys_no = len(self.subsystem_info_list)
1133
1134        with open("kernel.conf", "w") as fh:
1135            fh.write(json.dumps(nvmet_cfg, indent=2))
1136
1137    def tgt_start(self):
1138        self.log.info("Configuring kernel NVMeOF Target")
1139
1140        if self.null_block:
1141            self.log.info("Configuring with null block device.")
1142            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1143        else:
1144            self.log.info("Configuring with NVMe drives.")
1145            nvme_list = self.get_nvme_devices()
1146
1147        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1148        self.subsys_no = len(nvme_list)
1149
1150        self.nvmet_command(self.nvmet_bin, "clear")
1151        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1152
1153        if self.enable_adq:
1154            self.adq_configure_tc()
1155
1156        self.log.info("Done configuring kernel NVMeOF Target")
1157
1158
1159class SPDKTarget(Target):
1160    def __init__(self, name, general_config, target_config):
1161        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1162        # Method setting IRQ affinity is run as part of parent classes init,
1163        # so we need to have self.core_mask set before changing IRQ affinity.
1164        self.core_mask = target_config["core_mask"]
1165        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1166
1167        super().__init__(name, general_config, target_config)
1168
1169        # Format: property, default value
1170        config_fields = [
1171            ConfigField(name='dif_insert_strip', default=False),
1172            ConfigField(name='null_block_dif_type', default=0),
1173            ConfigField(name='num_shared_buffers', default=4096),
1174            ConfigField(name='max_queue_depth', default=128),
1175            ConfigField(name='bpf_scripts', default=[]),
1176            ConfigField(name='scheduler_core_limit', default=None),
1177            ConfigField(name='dsa_settings', default=False),
1178            ConfigField(name='iobuf_small_pool_count', default=32767),
1179            ConfigField(name='iobuf_large_pool_count', default=16383),
1180            ConfigField(name='num_cqe', default=4096),
1181            ConfigField(name='sock_impl', default='posix')
1182        ]
1183
1184        self.read_config(config_fields, target_config)
1185
1186        self.bpf_proc = None
1187        self.enable_dsa = False
1188
1189        self.log.info("====DSA settings:====")
1190        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1191
1192    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1193        if mode not in ["default", "bynode", "cpulist",
1194                        "shared", "split", "split-bynode"]:
1195            self.log.error("%s irq affinity setting not supported" % mode)
1196            raise Exception
1197
1198        # Create core list from SPDK's mask and change it to string.
1199        # This is the type configure_irq_affinity expects for cpulist parameter.
1200        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1201        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1202        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1203
1204        if mode == "shared":
1205            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1206        elif mode == "split":
1207            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1208        elif mode == "split-bynode":
1209            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1210        else:
1211            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1212
1213    def adq_set_busy_read(self, busy_read_val):
1214        return {"net.core.busy_read": busy_read_val}
1215
1216    def get_nvme_devices_count(self):
1217        return len(self.get_nvme_devices())
1218
1219    def get_nvme_devices(self):
1220        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1221        bdev_bdfs = []
1222        for bdev in bdev_subsys_json_obj["config"]:
1223            bdev_traddr = bdev["params"]["traddr"]
1224            if bdev_traddr in self.nvme_blocklist:
1225                continue
1226            if len(self.nvme_allowlist) == 0:
1227                bdev_bdfs.append(bdev_traddr)
1228            if bdev_traddr in self.nvme_allowlist:
1229                bdev_bdfs.append(bdev_traddr)
1230        return bdev_bdfs
1231
1232    def spdk_tgt_configure(self):
1233        self.log.info("Configuring SPDK NVMeOF target via RPC")
1234
1235        # Create transport layer
1236        nvmf_transport_params = {
1237            "client": self.client,
1238            "trtype": self.transport,
1239            "num_shared_buffers": self.num_shared_buffers,
1240            "max_queue_depth": self.max_queue_depth,
1241            "dif_insert_or_strip": self.dif_insert_strip,
1242            "sock_priority": self.adq_priority,
1243            "num_cqe": self.num_cqe
1244        }
1245
1246        if self.enable_adq:
1247            nvmf_transport_params["acceptor_poll_rate"] = 10000
1248
1249        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1250        self.log.info("SPDK NVMeOF transport layer:")
1251        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1252
1253        if self.null_block:
1254            self.spdk_tgt_add_nullblock(self.null_block)
1255            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1256        else:
1257            self.spdk_tgt_add_nvme_conf()
1258            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1259
1260        if self.enable_adq:
1261            self.adq_configure_tc()
1262
1263        self.log.info("Done configuring SPDK NVMeOF Target")
1264
1265    def spdk_tgt_add_nullblock(self, null_block_count):
1266        md_size = 0
1267        block_size = 4096
1268        if self.null_block_dif_type != 0:
1269            md_size = 128
1270
1271        self.log.info("Adding null block bdevices to config via RPC")
1272        for i in range(null_block_count):
1273            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1274            rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i),
1275                                      dif_type=self.null_block_dif_type, md_size=md_size)
1276        self.log.info("SPDK Bdevs configuration:")
1277        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1278
1279    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1280        self.log.info("Adding NVMe bdevs to config via RPC")
1281
1282        bdfs = self.get_nvme_devices()
1283        bdfs = [b.replace(":", ".") for b in bdfs]
1284
1285        if req_num_disks:
1286            if req_num_disks > len(bdfs):
1287                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1288                sys.exit(1)
1289            else:
1290                bdfs = bdfs[0:req_num_disks]
1291
1292        for i, bdf in enumerate(bdfs):
1293            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1294
1295        self.log.info("SPDK Bdevs configuration:")
1296        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1297
1298    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1299        self.log.info("Adding subsystems to config")
1300        if not req_num_disks:
1301            req_num_disks = self.get_nvme_devices_count()
1302
1303        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1304            port = str(4420 + bdev_num)
1305            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1306            serial = "SPDK00%s" % bdev_num
1307            bdev_name = "Nvme%sn1" % bdev_num
1308
1309            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1310                                           allow_any_host=True, max_namespaces=8)
1311            rpc.nvmf.nvmf_subsystem_add_ns(client=self.client, nqn=nqn, bdev_name=bdev_name)
1312            for nqn_name in [nqn, "discovery"]:
1313                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1314                                                     nqn=nqn_name,
1315                                                     trtype=self.transport,
1316                                                     traddr=ip,
1317                                                     trsvcid=port,
1318                                                     adrfam="ipv4")
1319            self.subsystem_info_list.append((port, nqn, ip))
1320        self.subsys_no = len(self.subsystem_info_list)
1321
1322        self.log.info("SPDK NVMeOF subsystem configuration:")
1323        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1324
1325    def bpf_start(self):
1326        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1327        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1328        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1329        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1330
1331        with open(self.pid, "r") as fh:
1332            nvmf_pid = str(fh.readline())
1333
1334        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1335        self.log.info(cmd)
1336        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1337
1338    def tgt_start(self):
1339        if self.null_block:
1340            self.subsys_no = 1
1341        else:
1342            self.subsys_no = self.get_nvme_devices_count()
1343        self.log.info("Starting SPDK NVMeOF Target process")
1344        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1345        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1346        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1347
1348        with open(self.pid, "w") as fh:
1349            fh.write(str(proc.pid))
1350        self.nvmf_proc = proc
1351        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1352        self.log.info("Waiting for spdk to initialize...")
1353        while True:
1354            if os.path.exists("/var/tmp/spdk.sock"):
1355                break
1356            time.sleep(1)
1357        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1358
1359        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1360        rpc.iobuf.iobuf_set_options(self.client,
1361                                    small_pool_count=self.iobuf_small_pool_count,
1362                                    large_pool_count=self.iobuf_large_pool_count,
1363                                    small_bufsize=None,
1364                                    large_bufsize=None)
1365
1366        if self.enable_zcopy:
1367            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1368                                           enable_zerocopy_send_server=True)
1369            self.log.info("Target socket options:")
1370            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1371
1372        if self.enable_adq:
1373            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1374            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1375                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1376
1377        if self.enable_dsa:
1378            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1379            self.log.info("Target DSA accel module enabled")
1380
1381        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1382        rpc.framework_start_init(self.client)
1383
1384        if self.bpf_scripts:
1385            self.bpf_start()
1386
1387        self.spdk_tgt_configure()
1388
1389    def stop(self):
1390        if self.bpf_proc:
1391            self.log.info("Stopping BPF Trace script")
1392            self.bpf_proc.terminate()
1393            self.bpf_proc.wait()
1394
1395        if hasattr(self, "nvmf_proc"):
1396            try:
1397                self.nvmf_proc.terminate()
1398                self.nvmf_proc.wait(timeout=30)
1399            except Exception as e:
1400                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1401                self.log.info(e)
1402                self.nvmf_proc.kill()
1403                self.nvmf_proc.communicate()
1404                # Try to clean up RPC socket files if they were not removed
1405                # because of using 'kill'
1406                try:
1407                    os.remove("/var/tmp/spdk.sock")
1408                    os.remove("/var/tmp/spdk.sock.lock")
1409                except FileNotFoundError:
1410                    pass
1411        self.restore_settings()
1412        super().stop()
1413
1414
1415class KernelInitiator(Initiator):
1416    def __init__(self, name, general_config, initiator_config):
1417        super().__init__(name, general_config, initiator_config)
1418
1419        # Defaults
1420        self.extra_params = initiator_config.get('extra_params', '')
1421
1422        self.ioengine = "libaio"
1423        self.spdk_conf = ""
1424
1425        if "num_cores" in initiator_config:
1426            self.num_cores = initiator_config["num_cores"]
1427
1428        if "kernel_engine" in initiator_config:
1429            self.ioengine = initiator_config["kernel_engine"]
1430            if "io_uring" in self.ioengine:
1431                self.extra_params += ' --nr-poll-queues=8'
1432
1433    def configure_adq(self):
1434        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1435
1436    def adq_configure_tc(self):
1437        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1438
1439    def adq_set_busy_read(self, busy_read_val):
1440        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1441        return {"net.core.busy_read": 0}
1442
1443    def get_connected_nvme_list(self):
1444        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1445        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1446                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1447        return nvme_list
1448
1449    def init_connect(self):
1450        self.log.info("Below connection attempts may result in error messages, this is expected!")
1451        for subsystem in self.subsystem_info_list:
1452            self.log.info("Trying to connect %s %s %s" % subsystem)
1453            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1454                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1455            time.sleep(2)
1456
1457        if "io_uring" in self.ioengine:
1458            self.log.info("Setting block layer settings for io_uring.")
1459
1460            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1461            #       apparently it's not possible for connected subsystems.
1462            #       Results in "error: Invalid argument"
1463            block_sysfs_settings = {
1464                "iostats": "0",
1465                "rq_affinity": "0",
1466                "nomerges": "2"
1467            }
1468
1469            for disk in self.get_connected_nvme_list():
1470                sysfs = os.path.join("/sys/block", disk, "queue")
1471                for k, v in block_sysfs_settings.items():
1472                    sysfs_opt_path = os.path.join(sysfs, k)
1473                    try:
1474                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1475                    except CalledProcessError as e:
1476                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1477                    finally:
1478                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1479                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1480
1481    def init_disconnect(self):
1482        for subsystem in self.subsystem_info_list:
1483            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1484            time.sleep(1)
1485
1486    def get_nvme_subsystem_numa(self, dev_name):
1487        # Remove two last characters to get controller name instead of subsystem name
1488        nvme_ctrl = os.path.basename(dev_name)[:-2]
1489        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1490                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1491        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1492
1493    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True):
1494        self.available_cpus = self.get_numa_cpu_map()
1495        if len(threads) >= len(subsystems):
1496            threads = range(0, len(subsystems))
1497
1498        # Generate connected nvme devices names and sort them by used NIC numa node
1499        # to allow better grouping when splitting into fio sections.
1500        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1501        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1502        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1503
1504        filename_section = ""
1505        nvme_per_split = int(len(nvme_list) / len(threads))
1506        remainder = len(nvme_list) % len(threads)
1507        iterator = iter(nvme_list)
1508        result = []
1509        for i in range(len(threads)):
1510            result.append([])
1511            for _ in range(nvme_per_split):
1512                result[i].append(next(iterator))
1513                if remainder:
1514                    result[i].append(next(iterator))
1515                    remainder -= 1
1516        for i, r in enumerate(result):
1517            header = "[filename%s]" % i
1518            disks = "\n".join(["filename=%s" % x for x in r])
1519            job_section_qd = round((io_depth * len(r)) / num_jobs)
1520            if job_section_qd == 0:
1521                job_section_qd = 1
1522            iodepth = "iodepth=%s" % job_section_qd
1523
1524            offset_section = ""
1525            if offset:
1526                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1527
1528            numa_opts = ""
1529            if numa_align:
1530                numa_opts = self.gen_fio_numa_section(r, num_jobs)
1531
1532            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1533
1534        return filename_section
1535
1536
1537class SPDKInitiator(Initiator):
1538    def __init__(self, name, general_config, initiator_config):
1539        super().__init__(name, general_config, initiator_config)
1540
1541        if not self.skip_spdk_install:
1542            self.install_spdk()
1543
1544        # Optional fields
1545        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1546        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1547        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1548        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1549
1550        if "num_cores" in initiator_config:
1551            self.num_cores = initiator_config["num_cores"]
1552
1553        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1554        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1555
1556    def adq_set_busy_read(self, busy_read_val):
1557        return {"net.core.busy_read": busy_read_val}
1558
1559    def install_spdk(self):
1560        self.log.info("Using fio binary %s" % self.fio_bin)
1561        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1562        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1563        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1564                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1565                       "--enable-lto", "--disable-unit-tests"])
1566        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1567        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1568
1569        self.log.info("SPDK built")
1570        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1571
1572    def init_connect(self):
1573        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1574        # bdev plugin initiates connection just before starting IO traffic.
1575        # This is just to have a "init_connect" equivalent of the same function
1576        # from KernelInitiator class.
1577        # Just prepare bdev.conf JSON file for later use and consider it
1578        # "making a connection".
1579        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1580        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1581
1582    def init_disconnect(self):
1583        # SPDK Initiator does not need to explicitly disconnect as this gets done
1584        # after fio bdev plugin finishes IO.
1585        return
1586
1587    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1588        spdk_cfg_section = {
1589            "subsystems": [
1590                {
1591                    "subsystem": "bdev",
1592                    "config": []
1593                },
1594                {
1595                    "subsystem": "iobuf",
1596                    "config": [
1597                        {
1598                            "method": "iobuf_set_options",
1599                            "params": {
1600                                "small_pool_count": self.small_pool_count,
1601                                "large_pool_count": self.large_pool_count
1602                            }
1603                        }
1604                    ]
1605                },
1606                {
1607                    "subsystem": "sock",
1608                    "config": [
1609                        {
1610                            "method": "sock_set_default_impl",
1611                            "params": {
1612                                "impl_name": self.sock_impl
1613                            }
1614                        }
1615                    ]
1616                }
1617            ]
1618        }
1619
1620        for i, subsys in enumerate(remote_subsystem_list):
1621            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1622            nvme_ctrl = {
1623                "method": "bdev_nvme_attach_controller",
1624                "params": {
1625                    "name": "Nvme{}".format(i),
1626                    "trtype": self.transport,
1627                    "traddr": sub_addr,
1628                    "trsvcid": sub_port,
1629                    "subnqn": sub_nqn,
1630                    "adrfam": "IPv4"
1631                }
1632            }
1633
1634            if self.enable_adq:
1635                nvme_ctrl["params"].update({"priority": "1"})
1636
1637            if self.enable_data_digest:
1638                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1639
1640            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1641
1642        return json.dumps(spdk_cfg_section, indent=2)
1643
1644    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True):
1645        self.available_cpus = self.get_numa_cpu_map()
1646        filename_section = ""
1647        if len(threads) >= len(subsystems):
1648            threads = range(0, len(subsystems))
1649
1650        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1651        # to allow better grouping when splitting into fio sections.
1652        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1653        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1654        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1655
1656        nvme_per_split = int(len(subsystems) / len(threads))
1657        remainder = len(subsystems) % len(threads)
1658        iterator = iter(filenames)
1659        result = []
1660        for i in range(len(threads)):
1661            result.append([])
1662            for _ in range(nvme_per_split):
1663                result[i].append(next(iterator))
1664            if remainder:
1665                result[i].append(next(iterator))
1666                remainder -= 1
1667        for i, r in enumerate(result):
1668            header = "[filename%s]" % i
1669            disks = "\n".join(["filename=%s" % x for x in r])
1670            job_section_qd = round((io_depth * len(r)) / num_jobs)
1671            if job_section_qd == 0:
1672                job_section_qd = 1
1673            iodepth = "iodepth=%s" % job_section_qd
1674
1675            offset_section = ""
1676            if offset:
1677                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1678
1679            numa_opts = ""
1680            if numa_align:
1681                numa_opts = self.gen_fio_numa_section(r, num_jobs)
1682
1683            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1684
1685        return filename_section
1686
1687    def get_nvme_subsystem_numa(self, bdev_name):
1688        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1689        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1690
1691        # Remove two last characters to get controller name instead of subsystem name
1692        nvme_ctrl = bdev_name[:-2]
1693        for bdev in bdev_conf_json_obj:
1694            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1695                return self.get_route_nic_numa(bdev["params"]["traddr"])
1696        return None
1697
1698
1699def initiators_match_subsystems(initiators, target_obj):
1700    for i in initiators:
1701        i.match_subsystems(target_obj.subsystem_info_list)
1702        if i.enable_adq:
1703            i.adq_configure_tc()
1704
1705
1706def run_single_fio_test(args, initiators, configs, target_obj, block_size, io_depth, rw, fio_settings):
1707
1708    for run_no in range(1, fio_settings["run_num"]+1):
1709        threads = []
1710        power_daemon = None
1711        measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_settings["rw_mix_read"], run_no)
1712
1713        for i, cfg in zip(initiators, configs):
1714            t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1715            threads.append(t)
1716        if target_obj.enable_sar:
1717            sar_file_prefix = measurements_prefix + "_sar"
1718            t = threading.Thread(target=target_obj.measure_sar,
1719                                 args=(args.results, sar_file_prefix, fio_settings["ramp_time"], fio_settings["run_time"]))
1720            threads.append(t)
1721
1722        if target_obj.enable_pcm:
1723            pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1724            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1725                                         args=(args.results, pcm_fnames[0], fio_settings["ramp_time"], fio_settings["run_time"]))
1726            threads.append(pcm_cpu_t)
1727
1728        if target_obj.enable_bw:
1729            bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1730            t = threading.Thread(target=target_obj.measure_network_bandwidth,
1731                                 args=(args.results, bandwidth_file_name, fio_settings["ramp_time"], fio_settings["run_time"]))
1732            threads.append(t)
1733
1734        if target_obj.enable_dpdk_memory:
1735            dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1736            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_settings["ramp_time"]))
1737            threads.append(t)
1738
1739        if target_obj.enable_pm:
1740            power_daemon = threading.Thread(target=target_obj.measure_power,
1741                                            args=(args.results, measurements_prefix, script_full_dir,
1742                                                  fio_settings["ramp_time"], fio_settings["run_time"]))
1743            threads.append(power_daemon)
1744
1745        for t in threads:
1746            t.start()
1747        for t in threads:
1748            t.join()
1749
1750
1751def run_fio_tests(args, initiators, target_obj, fio_settings):
1752
1753    for block_size, io_depth, rw in fio_settings["workloads"]:
1754        configs = []
1755        for i in initiators:
1756            i.init_connect()
1757            cfg = i.gen_fio_config(rw, block_size, io_depth, target_obj.subsys_no, fio_settings)
1758            configs.append(cfg)
1759
1760        run_single_fio_test(args, initiators, configs, target_obj, block_size, io_depth, rw, fio_settings)
1761
1762        for i in initiators:
1763            i.init_disconnect()
1764            i.copy_result_files(args.results)
1765
1766    try:
1767        parse_results(args.results, args.csv_filename)
1768    except Exception as err:
1769        logging.error("There was an error with parsing the results")
1770        raise err
1771
1772
1773def validate_allowlist_settings(data, is_forced_used):
1774    if data["target"].get("null_block_devices"):
1775        logging.info("Null block devices used for testing. Allow and block lists will not be checked.")
1776        return True
1777
1778    if is_forced_used:
1779        logging.warning("""Force mode used. All available NVMe drives on your system will be used,
1780                     which may potentially lead to data loss""")
1781        return True
1782
1783    if not (data["target"].get("allowlist") or data["target"].get("blocklist")):
1784        logging.error("""This script requires allowlist or blocklist to be defined.
1785                      You can choose to use all available NVMe drives on your system (-f option),
1786                      which may potentially lead to data loss.""")
1787        return False
1788
1789    return True
1790
1791
1792class CpuThrottlingError(Exception):
1793    def __init__(self, message):
1794        super().__init__(message)
1795
1796
1797if __name__ == "__main__":
1798    exit_code = 0
1799
1800    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1801    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1802
1803    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1804    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1805                        help='Configuration file.')
1806    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1807                        help='Results directory.')
1808    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1809                        help='CSV results filename.')
1810    parser.add_argument('-f', '--force', default=False, action='store_true',
1811                        dest='force', help="""Force script to continue and try to use all
1812                        available NVMe devices during test.
1813                        WARNING: Might result in data loss on used NVMe drives""")
1814
1815    args = parser.parse_args()
1816
1817    logging.basicConfig(level=logging.INFO,
1818                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1819
1820    logging.info("Using config file: %s" % args.config)
1821    with open(args.config, "r") as config:
1822        data = json.load(config)
1823
1824    initiators = []
1825    general_config = data["general"]
1826    target_config = data["target"]
1827    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1828
1829    if not validate_allowlist_settings(data, args.force):
1830        exit(1)
1831
1832    for k, v in data.items():
1833        if "target" in k:
1834            v.update({"results_dir": args.results})
1835            if data[k]["mode"] == "spdk":
1836                target_obj = SPDKTarget(k, data["general"], v)
1837            elif data[k]["mode"] == "kernel":
1838                target_obj = KernelTarget(k, data["general"], v)
1839        elif "initiator" in k:
1840            if data[k]["mode"] == "spdk":
1841                init_obj = SPDKInitiator(k, data["general"], v)
1842            elif data[k]["mode"] == "kernel":
1843                init_obj = KernelInitiator(k, data["general"], v)
1844            initiators.append(init_obj)
1845        elif "fio" in k:
1846            fio_settings = {
1847                "workloads": itertools.product(data[k]["bs"], data[k]["qd"],  data[k]["rw"]),
1848                "run_time": data[k].get("run_time", 10),
1849                "ramp_time": data[k].get("ramp_time", 0),
1850                "rw_mix_read": data[k].get("rwmixread", 70),
1851                "run_num": data[k].get("run_num", None),
1852                "num_jobs": data[k].get("num_jobs", None),
1853                "rate_iops": data[k].get("rate_iops", 0),
1854                "offset": data[k].get("offset", False),
1855                "offset_inc": data[k].get("offset_inc", 0),
1856                "numa_align": data[k].get("numa_align", 1)
1857            }
1858        else:
1859            continue
1860
1861    try:
1862        os.mkdir(args.results)
1863    except FileExistsError:
1864        pass
1865
1866    for i in initiators:
1867        target_obj.initiator_info.append(
1868            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1869        )
1870
1871    try:
1872        target_obj.tgt_start()
1873        initiators_match_subsystems(initiators, target_obj)
1874
1875        # Poor mans threading
1876        # Run FIO tests
1877        run_fio_tests(args, initiators, target_obj, fio_settings)
1878
1879    except Exception as e:
1880        logging.error("Exception occurred while running FIO tests")
1881        logging.error(e)
1882        exit_code = 1
1883
1884    finally:
1885        for i in [*initiators, target_obj]:
1886            try:
1887                i.stop()
1888            except CpuThrottlingError as err:
1889                logging.error(err)
1890                exit_code = 1
1891            except Exception as err:
1892                pass
1893        sys.exit(exit_code)
1894