xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision f8abbede89d30584d2a4f8427b13896f8591b873)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False),
57            ConfigField(name='pause_frames', default=None)
58        ]
59
60        self.read_config(config_fields, general_config)
61        self.transport = self.transport.lower()
62
63        config_fields = [
64            ConfigField(name='nic_ips', required=True),
65            ConfigField(name='mode', required=True),
66            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
67            ConfigField(name='enable_arfs', default=False),
68            ConfigField(name='tuned_profile', default='')
69        ]
70        self.read_config(config_fields, server_config)
71
72        self.local_nic_info = []
73        self._nics_json_obj = {}
74        self.svc_restore_dict = {}
75        self.sysctl_restore_dict = {}
76        self.tuned_restore_dict = {}
77        self.governor_restore = ""
78
79        self.enable_adq = server_config.get('adq_enable', False)
80        self.adq_priority = 1 if self.enable_adq else None
81        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
82
83        if not re.match(r'^[A-Za-z0-9\-]+$', name):
84            self.log.info("Please use a name which contains only letters, numbers or dashes")
85            sys.exit(1)
86
87    def read_config(self, config_fields, config):
88        for config_field in config_fields:
89            value = config.get(config_field.name, config_field.default)
90            if config_field.required is True and value is None:
91                raise ValueError('%s config key is required' % config_field.name)
92
93            setattr(self, config_field.name, value)
94
95    @staticmethod
96    def get_uncommented_lines(lines):
97        return [line for line in lines if line and not line.startswith('#')]
98
99    def get_nic_name_by_ip(self, ip):
100        if not self._nics_json_obj:
101            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
102            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
103        for nic in self._nics_json_obj:
104            for addr in nic["addr_info"]:
105                if ip in addr["local"]:
106                    return nic["ifname"]
107
108    @abstractmethod
109    def set_local_nic_info_helper(self):
110        pass
111
112    def set_local_nic_info(self, pci_info):
113        def extract_network_elements(json_obj):
114            nic_list = []
115            if isinstance(json_obj, list):
116                for x in json_obj:
117                    nic_list.extend(extract_network_elements(x))
118            elif isinstance(json_obj, dict):
119                if "children" in json_obj:
120                    nic_list.extend(extract_network_elements(json_obj["children"]))
121                if "class" in json_obj.keys() and "network" in json_obj["class"]:
122                    nic_list.append(json_obj)
123            return nic_list
124
125        self.local_nic_info = extract_network_elements(pci_info)
126
127    def get_nic_numa_node(self, nic_name):
128        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
129
130    def get_numa_cpu_map(self):
131        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
132        numa_cpu_json_map = {}
133
134        for cpu in numa_cpu_json_obj["cpus"]:
135            cpu_num = int(cpu["cpu"])
136            numa_node = int(cpu["node"])
137            numa_cpu_json_map.setdefault(numa_node, [])
138            numa_cpu_json_map[numa_node].append(cpu_num)
139
140        return numa_cpu_json_map
141
142    # pylint: disable=R0201
143    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
144        return ""
145
146    def configure_system(self):
147        self.load_drivers()
148        self.configure_services()
149        self.configure_sysctl()
150        self.configure_arfs()
151        self.configure_tuned()
152        self.configure_cpu_governor()
153        self.configure_irq_affinity(**self.irq_settings)
154        self.configure_pause_frames()
155
156    RDMA_PROTOCOL_IWARP = 0
157    RDMA_PROTOCOL_ROCE = 1
158    RDMA_PROTOCOL_UNKNOWN = -1
159
160    def check_rdma_protocol(self):
161        try:
162            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
163            roce_ena = roce_ena.strip()
164            if roce_ena == "0":
165                return self.RDMA_PROTOCOL_IWARP
166            else:
167                return self.RDMA_PROTOCOL_ROCE
168        except CalledProcessError as e:
169            self.log.error("ERROR: failed to check RDMA protocol!")
170            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
171            return self.RDMA_PROTOCOL_UNKNOWN
172
173    def load_drivers(self):
174        self.log.info("Loading drivers")
175        self.exec_cmd(["sudo", "modprobe", "-a",
176                       "nvme-%s" % self.transport,
177                       "nvmet-%s" % self.transport])
178        current_mode = self.check_rdma_protocol()
179        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
180            self.log.error("ERROR: failed to check RDMA protocol mode")
181            return
182        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
183            self.reload_driver("irdma", "roce_ena=1")
184            self.log.info("Loaded irdma driver with RoCE enabled")
185        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
186            self.log.info("Leaving irdma driver with RoCE enabled")
187        else:
188            self.reload_driver("irdma", "roce_ena=0")
189            self.log.info("Loaded irdma driver with iWARP enabled")
190
191    def set_pause_frames(self, rx_state, tx_state):
192        for nic_ip in self.nic_ips:
193            nic_name = self.get_nic_name_by_ip(nic_ip)
194            self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state])
195
196    def configure_pause_frames(self):
197        if self.pause_frames is None:
198            self.log.info("Keeping NIC's default pause frames setting")
199            return
200        elif not self.pause_frames:
201            self.log.info("Turning off pause frames")
202            self.set_pause_frames("off", "off")
203        else:
204            self.log.info("Turning on pause frames")
205            self.set_pause_frames("on", "on")
206
207    def configure_arfs(self):
208        rps_flow_cnt = 512
209        if not self.enable_arfs:
210            rps_flow_cnt = 0
211
212        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
213        for nic_name in nic_names:
214            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
215            self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}")
216            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
217            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
218
219            for qf in queue_files:
220                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
221                set_value = self.exec_cmd(["cat", f"/sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]).strip()
222                self.log.info(f"Confirmed rps_flow_cnt set to {set_value} in /sys/class/net/{nic_name}/queues/{qf}")
223
224            self.log.info(f"Configuration of {nic_name} completed with confirmed rps_flow_cnt settings.")
225
226        self.log.info("ARFS configuration completed.")
227
228    def configure_adq(self):
229        self.adq_load_modules()
230        self.adq_configure_nic()
231
232    def adq_load_modules(self):
233        self.log.info("Modprobing ADQ-related Linux modules...")
234        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
235        for module in adq_module_deps:
236            try:
237                self.exec_cmd(["sudo", "modprobe", module])
238                self.log.info("%s loaded!" % module)
239            except CalledProcessError as e:
240                self.log.error("ERROR: failed to load module %s" % module)
241                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
242
243    def adq_configure_tc(self):
244        self.log.info("Configuring ADQ Traffic classes and filters...")
245
246        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
247        num_queues_tc1 = self.num_cores
248        port_param = "dst_port" if isinstance(self, Target) else "src_port"
249        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
250
251        for nic_ip in self.nic_ips:
252            nic_name = self.get_nic_name_by_ip(nic_ip)
253            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
254
255            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
256                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
257                                "queues", "%s@0" % num_queues_tc0,
258                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
259                                "hw", "1", "mode", "channel"]
260            self.log.info(" ".join(tc_qdisc_map_cmd))
261            self.exec_cmd(tc_qdisc_map_cmd)
262
263            time.sleep(5)
264            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
265            self.log.info(" ".join(tc_qdisc_ingress_cmd))
266            self.exec_cmd(tc_qdisc_ingress_cmd)
267
268            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
269            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
270                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
271
272            for port in nic_ports:
273                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
274                                 "protocol", "ip", "ingress", "prio", "1", "flower",
275                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
276                                 "skip_sw", "hw_tc", "1"]
277                self.log.info(" ".join(tc_filter_cmd))
278                self.exec_cmd(tc_filter_cmd)
279
280            # show tc configuration
281            self.log.info("Show tc configuration for %s NIC..." % nic_name)
282            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
283            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
284            self.log.info("%s" % tc_disk_out)
285            self.log.info("%s" % tc_filter_out)
286
287            # Ethtool coalesce settings must be applied after configuring traffic classes
288            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
289            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
290
291            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
292            xps_cmd = ["sudo", xps_script_path, nic_name]
293            self.log.info(xps_cmd)
294            self.exec_cmd(xps_cmd)
295
296    def reload_driver(self, driver, *modprobe_args):
297
298        try:
299            self.exec_cmd(["sudo", "rmmod", driver])
300            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
301        except CalledProcessError as e:
302            self.log.error("ERROR: failed to reload %s module!" % driver)
303            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
304
305    def adq_configure_nic(self):
306        self.log.info("Configuring NIC port settings for ADQ testing...")
307
308        # Reload the driver first, to make sure any previous settings are re-set.
309        self.reload_driver("ice")
310
311        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
312        for nic in nic_names:
313            self.log.info(nic)
314            try:
315                self.exec_cmd(["sudo", "ethtool", "-K", nic,
316                               "hw-tc-offload", "on"])  # Enable hardware TC offload
317                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
318                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
319                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
320                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
321                # but can be used with 4k block sizes as well
322                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
323                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
324            except subprocess.CalledProcessError as e:
325                self.log.error("ERROR: failed to configure NIC port using ethtool!")
326                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
327                self.log.info("Please update your NIC driver and firmware versions and try again.")
328            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
329            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
330
331    def configure_services(self):
332        self.log.info("Configuring active services...")
333        svc_config = configparser.ConfigParser(strict=False)
334
335        # Below list is valid only for RHEL / Fedora systems and might not
336        # contain valid names for other distributions.
337        svc_target_state = {
338            "firewalld": "inactive",
339            "irqbalance": "inactive",
340            "lldpad.service": "inactive",
341            "lldpad.socket": "inactive"
342        }
343
344        for service in svc_target_state:
345            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
346            out = "\n".join(["[%s]" % service, out])
347            svc_config.read_string(out)
348
349            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
350                continue
351
352            service_state = svc_config[service]["ActiveState"]
353            self.log.info("Current state of %s service is %s" % (service, service_state))
354            self.svc_restore_dict.update({service: service_state})
355            if service_state != "inactive":
356                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
357                self.exec_cmd(["sudo", "systemctl", "stop", service])
358
359    def configure_sysctl(self):
360        self.log.info("Tuning sysctl settings...")
361
362        busy_read = 0
363        busy_poll = 0
364        if self.enable_adq and self.mode == "spdk":
365            busy_read = 1
366            busy_poll = 1
367
368        sysctl_opts = {
369            "net.core.busy_poll": busy_poll,
370            "net.core.busy_read": busy_read,
371            "net.core.somaxconn": 4096,
372            "net.core.netdev_max_backlog": 8192,
373            "net.ipv4.tcp_max_syn_backlog": 16384,
374            "net.core.rmem_max": 268435456,
375            "net.core.wmem_max": 268435456,
376            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
377            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
378            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
379            "net.ipv4.route.flush": 1,
380            "vm.overcommit_memory": 1,
381            "net.core.rps_sock_flow_entries": 0
382        }
383
384        if self.enable_adq:
385            sysctl_opts.update(self.adq_set_busy_read(1))
386
387        if self.enable_arfs:
388            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
389
390        for opt, value in sysctl_opts.items():
391            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
392            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
393
394    def configure_tuned(self):
395        if not self.tuned_profile:
396            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
397            return
398
399        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
400        service = "tuned"
401        tuned_config = configparser.ConfigParser(strict=False)
402
403        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
404        out = "\n".join(["[%s]" % service, out])
405        tuned_config.read_string(out)
406        tuned_state = tuned_config[service]["ActiveState"]
407        self.svc_restore_dict.update({service: tuned_state})
408
409        if tuned_state != "inactive":
410            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
411            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
412
413            self.tuned_restore_dict = {
414                "profile": profile,
415                "mode": profile_mode
416            }
417
418        self.exec_cmd(["sudo", "systemctl", "start", service])
419        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
420        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
421
422    def configure_cpu_governor(self):
423        self.log.info("Setting CPU governor to performance...")
424
425        # This assumes that there is the same CPU scaling governor on each CPU
426        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
427        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
428
429    def get_core_list_from_mask(self, core_mask):
430        # Generate list of individual cores from hex core mask
431        # (e.g. '0xffff') or list containing:
432        # - individual cores (e.g. '1, 2, 3')
433        # - core ranges (e.g. '0-3')
434        # - mix of both (e.g. '0, 1-4, 9, 11-13')
435
436        core_list = []
437        if "0x" in core_mask:
438            core_mask_int = int(core_mask, 16)
439            for i in range(core_mask_int.bit_length()):
440                if (1 << i) & core_mask_int:
441                    core_list.append(i)
442            return core_list
443        else:
444            # Core list can be provided in .json config with square brackets
445            # remove them first
446            core_mask = core_mask.replace("[", "")
447            core_mask = core_mask.replace("]", "")
448
449            for i in core_mask.split(","):
450                if "-" in i:
451                    start, end = i.split("-")
452                    core_range = range(int(start), int(end) + 1)
453                    core_list.extend(core_range)
454                else:
455                    core_list.append(int(i))
456            return core_list
457
458    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
459        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
460
461        if mode not in ["default", "bynode", "cpulist"]:
462            raise ValueError("%s irq affinity setting not supported" % mode)
463
464        if mode == "cpulist" and not cpulist:
465            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
466
467        affinity_script = "set_irq_affinity.sh"
468        if "default" not in mode:
469            affinity_script = "set_irq_affinity_cpulist.sh"
470            system_cpu_map = self.get_numa_cpu_map()
471        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
472
473        def cpu_list_to_string(cpulist):
474            return ",".join(map(lambda x: str(x), cpulist))
475
476        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
477        for nic_name in nic_names:
478            irq_cmd = ["sudo", irq_script_path]
479
480            # Use only CPU cores matching NIC NUMA node.
481            # Remove any CPU cores if they're on exclusion list.
482            if mode == "bynode":
483                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
484                if cpulist and exclude_cpulist:
485                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
486                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
487                    if not irq_cpus:
488                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
489                irq_cmd.append(cpu_list_to_string(irq_cpus))
490
491            if mode == "cpulist":
492                irq_cpus = self.get_core_list_from_mask(cpulist)
493                if exclude_cpulist:
494                    # Flatten system CPU list, we don't need NUMA awareness here
495                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
496                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
497                    if not irq_cpus:
498                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
499                irq_cmd.append(cpu_list_to_string(irq_cpus))
500
501            irq_cmd.append(nic_name)
502            self.log.info(irq_cmd)
503            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
504
505    def restore_services(self):
506        self.log.info("Restoring services...")
507        for service, state in self.svc_restore_dict.items():
508            cmd = "stop" if state == "inactive" else "start"
509            self.exec_cmd(["sudo", "systemctl", cmd, service])
510
511    def restore_sysctl(self):
512        self.log.info("Restoring sysctl settings...")
513        for opt, value in self.sysctl_restore_dict.items():
514            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
515
516    def restore_tuned(self):
517        self.log.info("Restoring tuned-adm settings...")
518
519        if not self.tuned_restore_dict:
520            return
521
522        if self.tuned_restore_dict["mode"] == "auto":
523            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
524            self.log.info("Reverted tuned-adm to auto_profile.")
525        else:
526            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
527            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
528
529    def restore_governor(self):
530        self.log.info("Restoring CPU governor setting...")
531        if self.governor_restore:
532            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
533            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
534
535    def restore_settings(self):
536        self.restore_governor()
537        self.restore_tuned()
538        self.restore_services()
539        self.restore_sysctl()
540        if self.enable_adq:
541            self.reload_driver("ice")
542
543
544class Target(Server):
545    def __init__(self, name, general_config, target_config):
546        super().__init__(name, general_config, target_config)
547
548        # Defaults
549        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
550        self.subsystem_info_list = []
551        self.initiator_info = []
552
553        config_fields = [
554            ConfigField(name='mode', required=True),
555            ConfigField(name='results_dir', required=True),
556            ConfigField(name='enable_pm', default=True),
557            ConfigField(name='enable_sar', default=True),
558            ConfigField(name='enable_pcm', default=True),
559            ConfigField(name='enable_dpdk_memory', default=True)
560        ]
561        self.read_config(config_fields, target_config)
562
563        self.null_block = target_config.get('null_block_devices', 0)
564        self.scheduler_name = target_config.get('scheduler_settings', 'static')
565        self.enable_zcopy = target_config.get('zcopy_settings', False)
566        self.enable_bw = target_config.get('enable_bandwidth', True)
567        self.nvme_blocklist = target_config.get('blocklist', [])
568        self.nvme_allowlist = target_config.get('allowlist', [])
569
570        # Blocklist takes precedence, remove common elements from allowlist
571        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
572
573        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
574        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
575
576        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
577        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
578        self.set_local_nic_info(self.set_local_nic_info_helper())
579
580        if self.skip_spdk_install is False:
581            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
582
583        self.configure_system()
584        if self.enable_adq:
585            self.configure_adq()
586            self.configure_irq_affinity()
587        self.sys_config()
588
589    def set_local_nic_info_helper(self):
590        return json.loads(self.exec_cmd(["lshw", "-json"]))
591
592    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
593        stderr_opt = None
594        if stderr_redirect:
595            stderr_opt = subprocess.STDOUT
596        if change_dir:
597            old_cwd = os.getcwd()
598            os.chdir(change_dir)
599            self.log.info("Changing directory to %s" % change_dir)
600
601        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
602
603        if change_dir:
604            os.chdir(old_cwd)
605            self.log.info("Changing directory to %s" % old_cwd)
606        return out
607
608    def zip_spdk_sources(self, spdk_dir, dest_file):
609        self.log.info("Zipping SPDK source directory")
610        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
611        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
612            for file in files:
613                fh.write(os.path.relpath(os.path.join(root, file)))
614        fh.close()
615        self.log.info("Done zipping")
616
617    @staticmethod
618    def _chunks(input_list, chunks_no):
619        div, rem = divmod(len(input_list), chunks_no)
620        for i in range(chunks_no):
621            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
622            yield input_list[si:si + (div + 1 if i < rem else div)]
623
624    def spread_bdevs(self, req_disks):
625        # Spread available block devices indexes:
626        # - evenly across available initiator systems
627        # - evenly across available NIC interfaces for
628        #   each initiator
629        # Not NUMA aware.
630        ip_bdev_map = []
631        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
632
633        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
634            self.initiator_info[i]["bdev_range"] = init_chunk
635            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
636            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
637                for c in nic_chunk:
638                    ip_bdev_map.append((ip, c))
639        return ip_bdev_map
640
641    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
642        cpu_number = os.cpu_count()
643        sar_idle_sum = 0
644        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
645        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
646
647        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
648        time.sleep(ramp_time)
649        self.log.info("Starting SAR measurements")
650
651        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
652        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
653            for line in out.split("\n"):
654                if "Average" in line:
655                    if "CPU" in line:
656                        self.log.info("Summary CPU utilization from SAR:")
657                        self.log.info(line)
658                    elif "all" in line:
659                        self.log.info(line)
660                    else:
661                        sar_idle_sum += float(line.split()[7])
662            fh.write(out)
663        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
664
665        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
666            f.write("%0.2f" % sar_cpu_usage)
667
668    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
669        time.sleep(ramp_time)
670        self.log.info("Starting power measurements")
671        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
672                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
673                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
674
675    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
676        time.sleep(ramp_time)
677        cmd = ["pcm", "1", "-i=%s" % run_time,
678               "-csv=%s/%s" % (results_dir, pcm_file_name)]
679        subprocess.run(cmd)
680        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
681        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
682        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
683        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
684        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
685
686    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
687        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
688        time.sleep(ramp_time)
689        self.log.info("INFO: starting network bandwidth measure")
690        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
691                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
692        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
693        #       Improve this so that additional file containing measurements average is generated too.
694        # TODO: Monitor only these interfaces which are currently used to run the workload.
695
696    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
697        self.log.info("INFO: waiting to generate DPDK memory usage")
698        time.sleep(ramp_time)
699        self.log.info("INFO: generating DPDK memory usage")
700        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
701        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
702
703    def sys_config(self):
704        self.log.info("====Kernel release:====")
705        self.log.info(os.uname().release)
706        self.log.info("====Kernel command line:====")
707        with open('/proc/cmdline') as f:
708            cmdline = f.readlines()
709            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
710        self.log.info("====sysctl conf:====")
711        with open('/etc/sysctl.conf') as f:
712            sysctl = f.readlines()
713            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
714        self.log.info("====Cpu power info:====")
715        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
716        self.log.info("====zcopy settings:====")
717        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
718        self.log.info("====Scheduler settings:====")
719        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
720
721
722class Initiator(Server):
723    def __init__(self, name, general_config, initiator_config):
724        super().__init__(name, general_config, initiator_config)
725
726        # required fields and defaults
727        config_fields = [
728            ConfigField(name='mode', required=True),
729            ConfigField(name='ip', required=True),
730            ConfigField(name='target_nic_ips', required=True),
731            ConfigField(name='cpus_allowed', default=None),
732            ConfigField(name='cpus_allowed_policy', default='shared'),
733            ConfigField(name='spdk_dir', default='/tmp/spdk'),
734            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
735            ConfigField(name='nvmecli_bin', default='nvme'),
736            ConfigField(name='cpu_frequency', default=None),
737            ConfigField(name='allow_cpu_sharing', default=True)
738        ]
739
740        self.read_config(config_fields, initiator_config)
741
742        if os.getenv('SPDK_WORKSPACE'):
743            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
744
745        self.subsystem_info_list = []
746
747        self.ssh_connection = paramiko.SSHClient()
748        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
749        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
750        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
751        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
752        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
753
754        if self.skip_spdk_install is False:
755            self.copy_spdk("/tmp/spdk.zip")
756
757        self.set_local_nic_info(self.set_local_nic_info_helper())
758        self.set_cpu_frequency()
759        self.configure_system()
760        if self.enable_adq:
761            self.configure_adq()
762        self.sys_config()
763
764    def set_local_nic_info_helper(self):
765        return json.loads(self.exec_cmd(["lshw", "-json"]))
766
767    def stop(self):
768        self.restore_settings()
769        self.ssh_connection.close()
770
771    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
772        if change_dir:
773            cmd = ["cd", change_dir, ";", *cmd]
774
775        # In case one of the command elements contains whitespace and is not
776        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
777        # when sending to remote system.
778        for i, c in enumerate(cmd):
779            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
780                cmd[i] = '"%s"' % c
781        cmd = " ".join(cmd)
782
783        # Redirect stderr to stdout thanks using get_pty option if needed
784        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
785        out = stdout.read().decode(encoding="utf-8")
786
787        # Check the return code
788        rc = stdout.channel.recv_exit_status()
789        if rc:
790            raise CalledProcessError(int(rc), cmd, out)
791
792        return out
793
794    def put_file(self, local, remote_dest):
795        ftp = self.ssh_connection.open_sftp()
796        ftp.put(local, remote_dest)
797        ftp.close()
798
799    def get_file(self, remote, local_dest):
800        ftp = self.ssh_connection.open_sftp()
801        ftp.get(remote, local_dest)
802        ftp.close()
803
804    def copy_spdk(self, local_spdk_zip):
805        self.log.info("Copying SPDK sources to initiator %s" % self.name)
806        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
807        self.log.info("Copied sources zip from target")
808        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
809        self.log.info("Sources unpacked")
810
811    def copy_result_files(self, dest_dir):
812        self.log.info("Copying results")
813
814        if not os.path.exists(dest_dir):
815            os.mkdir(dest_dir)
816
817        # Get list of result files from initiator and copy them back to target
818        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
819
820        for file in file_list:
821            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
822                          os.path.join(dest_dir, file))
823        self.log.info("Done copying results")
824
825    def match_subsystems(self, target_subsystems):
826        subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips]
827        if not subsystems:
828            raise Exception("No matching subsystems found on target side!")
829        subsystems.sort(key=lambda x: x[1])
830        self.log.info("Found matching subsystems on target side:")
831        for s in subsystems:
832            self.log.info(s)
833        self.subsystem_info_list = subsystems
834
835    @abstractmethod
836    def init_connect(self):
837        pass
838
839    @abstractmethod
840    def init_disconnect(self):
841        pass
842
843    @abstractmethod
844    def gen_fio_filename_conf(self, *args, **kwargs):
845        # Logic implemented in SPDKInitiator and KernelInitiator classes
846        pass
847
848    def get_route_nic_numa(self, remote_nvme_ip):
849        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
850        local_nvme_nic = local_nvme_nic[0]["dev"]
851        return self.get_nic_numa_node(local_nvme_nic)
852
853    @staticmethod
854    def gen_fio_offset_section(offset_inc, num_jobs):
855        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
856        return "\n".join(["size=%s%%" % offset_inc,
857                          "offset=0%",
858                          "offset_increment=%s%%" % offset_inc])
859
860    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
861        numa_stats = {}
862        allowed_cpus = []
863        for nvme in fio_filenames_list:
864            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
865            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
866
867        # Use the most common NUMA node for this chunk to allocate memory and CPUs
868        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
869
870        # Check if we have enough free CPUs to pop from the list before assigning them
871        if len(self.available_cpus[section_local_numa]) < num_jobs:
872            if self.allow_cpu_sharing:
873                self.log.info("Regenerating available CPU list %s" % section_local_numa)
874                # Remove still available CPUs from the regenerated list. We don't want to
875                # regenerate it with duplicates.
876                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
877                self.available_cpus[section_local_numa].extend(cpus_regen)
878                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
879            else:
880                self.log.error("No more free CPU cores to use from allowed_cpus list!")
881                raise IndexError
882
883        for _ in range(num_jobs):
884            try:
885                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
886            except IndexError:
887                self.log.error("No more free CPU cores to use from allowed_cpus list!")
888                raise
889
890        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
891                          "numa_mem_policy=prefer:%s" % section_local_numa])
892
893    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
894                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
895                       offset=False, offset_inc=0):
896        fio_conf_template = """
897[global]
898ioengine={ioengine}
899{spdk_conf}
900thread=1
901group_reporting=1
902direct=1
903percentile_list=50:90:99:99.5:99.9:99.99:99.999
904
905norandommap=1
906rw={rw}
907rwmixread={rwmixread}
908bs={block_size}
909time_based=1
910ramp_time={ramp_time}
911runtime={run_time}
912rate_iops={rate_iops}
913"""
914
915        if self.cpus_allowed is not None:
916            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
917            cpus_num = 0
918            cpus = self.cpus_allowed.split(",")
919            for cpu in cpus:
920                if "-" in cpu:
921                    a, b = cpu.split("-")
922                    a = int(a)
923                    b = int(b)
924                    cpus_num += len(range(a, b))
925                else:
926                    cpus_num += 1
927            self.num_cores = cpus_num
928            threads = range(0, self.num_cores)
929        elif hasattr(self, 'num_cores'):
930            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
931            threads = range(0, int(self.num_cores))
932        else:
933            self.num_cores = len(self.subsystem_info_list)
934            threads = range(0, len(self.subsystem_info_list))
935
936        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
937                                                      offset, offset_inc)
938
939        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
940                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
941                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
942
943        # TODO: hipri disabled for now, as it causes fio errors:
944        # io_u error on file /dev/nvme2n1: Operation not supported
945        # See comment in KernelInitiator class, init_connect() function
946        if "io_uring" in self.ioengine:
947            fio_config = fio_config + """
948fixedbufs=1
949registerfiles=1
950#hipri=1
951"""
952        if num_jobs:
953            fio_config = fio_config + "numjobs=%s \n" % num_jobs
954        if self.cpus_allowed is not None:
955            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
956            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
957        fio_config = fio_config + filename_section
958
959        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
960        if hasattr(self, "num_cores"):
961            fio_config_filename += "_%sCPU" % self.num_cores
962        fio_config_filename += ".fio"
963
964        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
965        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
966        self.log.info("Created FIO Config:")
967        self.log.info(fio_config)
968
969        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
970
971    def set_cpu_frequency(self):
972        if self.cpu_frequency is not None:
973            try:
974                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
975                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
976                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
977            except Exception:
978                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
979                sys.exit(1)
980        else:
981            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
982
983    def run_fio(self, fio_config_file, run_num=1):
984        job_name, _ = os.path.splitext(fio_config_file)
985        self.log.info("Starting FIO run for job: %s" % job_name)
986        self.log.info("Using FIO: %s" % self.fio_bin)
987
988        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
989        try:
990            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
991                                    "--output=%s" % output_filename, "--eta=never"], True)
992            self.log.info(output)
993            self.log.info("FIO run finished. Results in: %s" % output_filename)
994        except subprocess.CalledProcessError as e:
995            self.log.error("ERROR: Fio process failed!")
996            self.log.error(e.stdout)
997
998    def sys_config(self):
999        self.log.info("====Kernel release:====")
1000        self.log.info(self.exec_cmd(["uname", "-r"]))
1001        self.log.info("====Kernel command line:====")
1002        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1003        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1004        self.log.info("====sysctl conf:====")
1005        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
1006        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1007        self.log.info("====Cpu power info:====")
1008        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
1009
1010
1011class KernelTarget(Target):
1012    def __init__(self, name, general_config, target_config):
1013        super().__init__(name, general_config, target_config)
1014        # Defaults
1015        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1016
1017    def load_drivers(self):
1018        self.log.info("Loading drivers")
1019        super().load_drivers()
1020        if self.null_block:
1021            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1022
1023    def configure_adq(self):
1024        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1025
1026    def adq_configure_tc(self):
1027        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1028
1029    def adq_set_busy_read(self, busy_read_val):
1030        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1031        return {"net.core.busy_read": 0}
1032
1033    def stop(self):
1034        self.nvmet_command(self.nvmet_bin, "clear")
1035        self.restore_settings()
1036
1037    def get_nvme_device_bdf(self, nvme_dev_path):
1038        nvme_name = os.path.basename(nvme_dev_path)
1039        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1040
1041    def get_nvme_devices(self):
1042        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1043        nvme_list = []
1044        for dev in dev_list:
1045            if "nvme" not in dev:
1046                continue
1047            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1048                continue
1049            if len(self.nvme_allowlist) == 0:
1050                nvme_list.append(dev)
1051                continue
1052            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1053                nvme_list.append(dev)
1054        return nvme_list
1055
1056    def nvmet_command(self, nvmet_bin, command):
1057        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1058
1059    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1060
1061        nvmet_cfg = {
1062            "ports": [],
1063            "hosts": [],
1064            "subsystems": [],
1065        }
1066
1067        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1068            port = str(4420 + bdev_num)
1069            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1070            serial = "SPDK00%s" % bdev_num
1071            bdev_name = nvme_list[bdev_num]
1072
1073            nvmet_cfg["subsystems"].append({
1074                "allowed_hosts": [],
1075                "attr": {
1076                    "allow_any_host": "1",
1077                    "serial": serial,
1078                    "version": "1.3"
1079                },
1080                "namespaces": [
1081                    {
1082                        "device": {
1083                            "path": bdev_name,
1084                            "uuid": "%s" % uuid.uuid4()
1085                        },
1086                        "enable": 1,
1087                        "nsid": port
1088                    }
1089                ],
1090                "nqn": nqn
1091            })
1092
1093            nvmet_cfg["ports"].append({
1094                "addr": {
1095                    "adrfam": "ipv4",
1096                    "traddr": ip,
1097                    "trsvcid": port,
1098                    "trtype": self.transport
1099                },
1100                "portid": bdev_num,
1101                "referrals": [],
1102                "subsystems": [nqn]
1103            })
1104
1105            self.subsystem_info_list.append((port, nqn, ip))
1106        self.subsys_no = len(self.subsystem_info_list)
1107
1108        with open("kernel.conf", "w") as fh:
1109            fh.write(json.dumps(nvmet_cfg, indent=2))
1110
1111    def tgt_start(self):
1112        self.log.info("Configuring kernel NVMeOF Target")
1113
1114        if self.null_block:
1115            self.log.info("Configuring with null block device.")
1116            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1117        else:
1118            self.log.info("Configuring with NVMe drives.")
1119            nvme_list = self.get_nvme_devices()
1120
1121        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1122        self.subsys_no = len(nvme_list)
1123
1124        self.nvmet_command(self.nvmet_bin, "clear")
1125        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1126
1127        if self.enable_adq:
1128            self.adq_configure_tc()
1129
1130        self.log.info("Done configuring kernel NVMeOF Target")
1131
1132
1133class SPDKTarget(Target):
1134    def __init__(self, name, general_config, target_config):
1135        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1136        # Method setting IRQ affinity is run as part of parent classes init,
1137        # so we need to have self.core_mask set before changing IRQ affinity.
1138        self.core_mask = target_config["core_mask"]
1139        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1140
1141        super().__init__(name, general_config, target_config)
1142
1143        # Format: property, default value
1144        config_fields = [
1145            ConfigField(name='dif_insert_strip', default=False),
1146            ConfigField(name='null_block_dif_type', default=0),
1147            ConfigField(name='num_shared_buffers', default=4096),
1148            ConfigField(name='max_queue_depth', default=128),
1149            ConfigField(name='bpf_scripts', default=[]),
1150            ConfigField(name='scheduler_core_limit', default=None),
1151            ConfigField(name='dsa_settings', default=False),
1152            ConfigField(name='iobuf_small_pool_count', default=32767),
1153            ConfigField(name='iobuf_large_pool_count', default=16383),
1154            ConfigField(name='num_cqe', default=4096),
1155            ConfigField(name='sock_impl', default='posix')
1156        ]
1157
1158        self.read_config(config_fields, target_config)
1159
1160        self.bpf_proc = None
1161        self.enable_dsa = False
1162
1163        self.log.info("====DSA settings:====")
1164        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1165
1166    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1167        if mode not in ["default", "bynode", "cpulist",
1168                        "shared", "split", "split-bynode"]:
1169            self.log.error("%s irq affinity setting not supported" % mode)
1170            raise Exception
1171
1172        # Create core list from SPDK's mask and change it to string.
1173        # This is the type configure_irq_affinity expects for cpulist parameter.
1174        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1175        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1176        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1177
1178        if mode == "shared":
1179            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1180        elif mode == "split":
1181            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1182        elif mode == "split-bynode":
1183            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1184        else:
1185            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1186
1187    def adq_set_busy_read(self, busy_read_val):
1188        return {"net.core.busy_read": busy_read_val}
1189
1190    def get_nvme_devices_count(self):
1191        return len(self.get_nvme_devices())
1192
1193    def get_nvme_devices(self):
1194        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1195        bdev_bdfs = []
1196        for bdev in bdev_subsys_json_obj["config"]:
1197            bdev_traddr = bdev["params"]["traddr"]
1198            if bdev_traddr in self.nvme_blocklist:
1199                continue
1200            if len(self.nvme_allowlist) == 0:
1201                bdev_bdfs.append(bdev_traddr)
1202            if bdev_traddr in self.nvme_allowlist:
1203                bdev_bdfs.append(bdev_traddr)
1204        return bdev_bdfs
1205
1206    def spdk_tgt_configure(self):
1207        self.log.info("Configuring SPDK NVMeOF target via RPC")
1208
1209        # Create transport layer
1210        nvmf_transport_params = {
1211            "client": self.client,
1212            "trtype": self.transport,
1213            "num_shared_buffers": self.num_shared_buffers,
1214            "max_queue_depth": self.max_queue_depth,
1215            "dif_insert_or_strip": self.dif_insert_strip,
1216            "sock_priority": self.adq_priority,
1217            "num_cqe": self.num_cqe
1218        }
1219
1220        if self.enable_adq:
1221            nvmf_transport_params["acceptor_poll_rate"] = 10000
1222
1223        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1224        self.log.info("SPDK NVMeOF transport layer:")
1225        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1226
1227        if self.null_block:
1228            self.spdk_tgt_add_nullblock(self.null_block)
1229            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1230        else:
1231            self.spdk_tgt_add_nvme_conf()
1232            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1233
1234        if self.enable_adq:
1235            self.adq_configure_tc()
1236
1237        self.log.info("Done configuring SPDK NVMeOF Target")
1238
1239    def spdk_tgt_add_nullblock(self, null_block_count):
1240        md_size = 0
1241        block_size = 4096
1242        if self.null_block_dif_type != 0:
1243            md_size = 128
1244
1245        self.log.info("Adding null block bdevices to config via RPC")
1246        for i in range(null_block_count):
1247            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1248            rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i),
1249                                      dif_type=self.null_block_dif_type, md_size=md_size)
1250        self.log.info("SPDK Bdevs configuration:")
1251        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1252
1253    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1254        self.log.info("Adding NVMe bdevs to config via RPC")
1255
1256        bdfs = self.get_nvme_devices()
1257        bdfs = [b.replace(":", ".") for b in bdfs]
1258
1259        if req_num_disks:
1260            if req_num_disks > len(bdfs):
1261                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1262                sys.exit(1)
1263            else:
1264                bdfs = bdfs[0:req_num_disks]
1265
1266        for i, bdf in enumerate(bdfs):
1267            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1268
1269        self.log.info("SPDK Bdevs configuration:")
1270        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1271
1272    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1273        self.log.info("Adding subsystems to config")
1274        if not req_num_disks:
1275            req_num_disks = self.get_nvme_devices_count()
1276
1277        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1278            port = str(4420 + bdev_num)
1279            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1280            serial = "SPDK00%s" % bdev_num
1281            bdev_name = "Nvme%sn1" % bdev_num
1282
1283            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1284                                           allow_any_host=True, max_namespaces=8)
1285            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1286            for nqn_name in [nqn, "discovery"]:
1287                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1288                                                     nqn=nqn_name,
1289                                                     trtype=self.transport,
1290                                                     traddr=ip,
1291                                                     trsvcid=port,
1292                                                     adrfam="ipv4")
1293            self.subsystem_info_list.append((port, nqn, ip))
1294        self.subsys_no = len(self.subsystem_info_list)
1295
1296        self.log.info("SPDK NVMeOF subsystem configuration:")
1297        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1298
1299    def bpf_start(self):
1300        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1301        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1302        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1303        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1304
1305        with open(self.pid, "r") as fh:
1306            nvmf_pid = str(fh.readline())
1307
1308        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1309        self.log.info(cmd)
1310        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1311
1312    def tgt_start(self):
1313        if self.null_block:
1314            self.subsys_no = 1
1315        else:
1316            self.subsys_no = self.get_nvme_devices_count()
1317        self.log.info("Starting SPDK NVMeOF Target process")
1318        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1319        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1320        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1321
1322        with open(self.pid, "w") as fh:
1323            fh.write(str(proc.pid))
1324        self.nvmf_proc = proc
1325        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1326        self.log.info("Waiting for spdk to initialize...")
1327        while True:
1328            if os.path.exists("/var/tmp/spdk.sock"):
1329                break
1330            time.sleep(1)
1331        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1332
1333        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1334        rpc.iobuf.iobuf_set_options(self.client,
1335                                    small_pool_count=self.iobuf_small_pool_count,
1336                                    large_pool_count=self.iobuf_large_pool_count,
1337                                    small_bufsize=None,
1338                                    large_bufsize=None)
1339
1340        if self.enable_zcopy:
1341            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1342                                           enable_zerocopy_send_server=True)
1343            self.log.info("Target socket options:")
1344            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1345
1346        if self.enable_adq:
1347            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1348            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1349                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1350
1351        if self.enable_dsa:
1352            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1353            self.log.info("Target DSA accel module enabled")
1354
1355        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1356        rpc.framework_start_init(self.client)
1357
1358        if self.bpf_scripts:
1359            self.bpf_start()
1360
1361        self.spdk_tgt_configure()
1362
1363    def stop(self):
1364        if self.bpf_proc:
1365            self.log.info("Stopping BPF Trace script")
1366            self.bpf_proc.terminate()
1367            self.bpf_proc.wait()
1368
1369        if hasattr(self, "nvmf_proc"):
1370            try:
1371                self.nvmf_proc.terminate()
1372                self.nvmf_proc.wait(timeout=30)
1373            except Exception as e:
1374                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1375                self.log.info(e)
1376                self.nvmf_proc.kill()
1377                self.nvmf_proc.communicate()
1378                # Try to clean up RPC socket files if they were not removed
1379                # because of using 'kill'
1380                try:
1381                    os.remove("/var/tmp/spdk.sock")
1382                    os.remove("/var/tmp/spdk.sock.lock")
1383                except FileNotFoundError:
1384                    pass
1385        self.restore_settings()
1386
1387
1388class KernelInitiator(Initiator):
1389    def __init__(self, name, general_config, initiator_config):
1390        super().__init__(name, general_config, initiator_config)
1391
1392        # Defaults
1393        self.extra_params = initiator_config.get('extra_params', '')
1394
1395        self.ioengine = "libaio"
1396        self.spdk_conf = ""
1397
1398        if "num_cores" in initiator_config:
1399            self.num_cores = initiator_config["num_cores"]
1400
1401        if "kernel_engine" in initiator_config:
1402            self.ioengine = initiator_config["kernel_engine"]
1403            if "io_uring" in self.ioengine:
1404                self.extra_params += ' --nr-poll-queues=8'
1405
1406    def configure_adq(self):
1407        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1408
1409    def adq_configure_tc(self):
1410        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1411
1412    def adq_set_busy_read(self, busy_read_val):
1413        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1414        return {"net.core.busy_read": 0}
1415
1416    def get_connected_nvme_list(self):
1417        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1418        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1419                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1420        return nvme_list
1421
1422    def init_connect(self):
1423        self.log.info("Below connection attempts may result in error messages, this is expected!")
1424        for subsystem in self.subsystem_info_list:
1425            self.log.info("Trying to connect %s %s %s" % subsystem)
1426            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1427                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1428            time.sleep(2)
1429
1430        if "io_uring" in self.ioengine:
1431            self.log.info("Setting block layer settings for io_uring.")
1432
1433            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1434            #       apparently it's not possible for connected subsystems.
1435            #       Results in "error: Invalid argument"
1436            block_sysfs_settings = {
1437                "iostats": "0",
1438                "rq_affinity": "0",
1439                "nomerges": "2"
1440            }
1441
1442            for disk in self.get_connected_nvme_list():
1443                sysfs = os.path.join("/sys/block", disk, "queue")
1444                for k, v in block_sysfs_settings.items():
1445                    sysfs_opt_path = os.path.join(sysfs, k)
1446                    try:
1447                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1448                    except CalledProcessError as e:
1449                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1450                    finally:
1451                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1452                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1453
1454    def init_disconnect(self):
1455        for subsystem in self.subsystem_info_list:
1456            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1457            time.sleep(1)
1458
1459    def get_nvme_subsystem_numa(self, dev_name):
1460        # Remove two last characters to get controller name instead of subsystem name
1461        nvme_ctrl = os.path.basename(dev_name)[:-2]
1462        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1463                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1464        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1465
1466    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1467        self.available_cpus = self.get_numa_cpu_map()
1468        if len(threads) >= len(subsystems):
1469            threads = range(0, len(subsystems))
1470
1471        # Generate connected nvme devices names and sort them by used NIC numa node
1472        # to allow better grouping when splitting into fio sections.
1473        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1474        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1475        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1476
1477        filename_section = ""
1478        nvme_per_split = int(len(nvme_list) / len(threads))
1479        remainder = len(nvme_list) % len(threads)
1480        iterator = iter(nvme_list)
1481        result = []
1482        for i in range(len(threads)):
1483            result.append([])
1484            for _ in range(nvme_per_split):
1485                result[i].append(next(iterator))
1486                if remainder:
1487                    result[i].append(next(iterator))
1488                    remainder -= 1
1489        for i, r in enumerate(result):
1490            header = "[filename%s]" % i
1491            disks = "\n".join(["filename=%s" % x for x in r])
1492            job_section_qd = round((io_depth * len(r)) / num_jobs)
1493            if job_section_qd == 0:
1494                job_section_qd = 1
1495            iodepth = "iodepth=%s" % job_section_qd
1496
1497            offset_section = ""
1498            if offset:
1499                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1500
1501            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1502
1503            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1504
1505        return filename_section
1506
1507
1508class SPDKInitiator(Initiator):
1509    def __init__(self, name, general_config, initiator_config):
1510        super().__init__(name, general_config, initiator_config)
1511
1512        if self.skip_spdk_install is False:
1513            self.install_spdk()
1514
1515        # Optional fields
1516        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1517        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1518        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1519        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1520
1521        if "num_cores" in initiator_config:
1522            self.num_cores = initiator_config["num_cores"]
1523
1524        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1525        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1526
1527    def adq_set_busy_read(self, busy_read_val):
1528        return {"net.core.busy_read": busy_read_val}
1529
1530    def install_spdk(self):
1531        self.log.info("Using fio binary %s" % self.fio_bin)
1532        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1533        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1534        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1535                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1536                       "--enable-lto", "--disable-unit-tests"])
1537        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1538        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1539
1540        self.log.info("SPDK built")
1541        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1542
1543    def init_connect(self):
1544        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1545        # bdev plugin initiates connection just before starting IO traffic.
1546        # This is just to have a "init_connect" equivalent of the same function
1547        # from KernelInitiator class.
1548        # Just prepare bdev.conf JSON file for later use and consider it
1549        # "making a connection".
1550        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1551        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1552
1553    def init_disconnect(self):
1554        # SPDK Initiator does not need to explicity disconnect as this gets done
1555        # after fio bdev plugin finishes IO.
1556        return
1557
1558    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1559        spdk_cfg_section = {
1560            "subsystems": [
1561                {
1562                    "subsystem": "bdev",
1563                    "config": []
1564                },
1565                {
1566                    "subsystem": "iobuf",
1567                    "config": [
1568                        {
1569                            "method": "iobuf_set_options",
1570                            "params": {
1571                                "small_pool_count": self.small_pool_count,
1572                                "large_pool_count": self.large_pool_count
1573                            }
1574                        }
1575                    ]
1576                },
1577                {
1578                    "subsystem": "sock",
1579                    "config": [
1580                        {
1581                            "method": "sock_set_default_impl",
1582                            "params": {
1583                                "impl_name": self.sock_impl
1584                            }
1585                        }
1586                    ]
1587                }
1588            ]
1589        }
1590
1591        for i, subsys in enumerate(remote_subsystem_list):
1592            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1593            nvme_ctrl = {
1594                "method": "bdev_nvme_attach_controller",
1595                "params": {
1596                    "name": "Nvme{}".format(i),
1597                    "trtype": self.transport,
1598                    "traddr": sub_addr,
1599                    "trsvcid": sub_port,
1600                    "subnqn": sub_nqn,
1601                    "adrfam": "IPv4"
1602                }
1603            }
1604
1605            if self.enable_adq:
1606                nvme_ctrl["params"].update({"priority": "1"})
1607
1608            if self.enable_data_digest:
1609                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1610
1611            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1612
1613        return json.dumps(spdk_cfg_section, indent=2)
1614
1615    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1616        self.available_cpus = self.get_numa_cpu_map()
1617        filename_section = ""
1618        if len(threads) >= len(subsystems):
1619            threads = range(0, len(subsystems))
1620
1621        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1622        # to allow better grouping when splitting into fio sections.
1623        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1624        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1625        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1626
1627        nvme_per_split = int(len(subsystems) / len(threads))
1628        remainder = len(subsystems) % len(threads)
1629        iterator = iter(filenames)
1630        result = []
1631        for i in range(len(threads)):
1632            result.append([])
1633            for _ in range(nvme_per_split):
1634                result[i].append(next(iterator))
1635            if remainder:
1636                result[i].append(next(iterator))
1637                remainder -= 1
1638        for i, r in enumerate(result):
1639            header = "[filename%s]" % i
1640            disks = "\n".join(["filename=%s" % x for x in r])
1641            job_section_qd = round((io_depth * len(r)) / num_jobs)
1642            if job_section_qd == 0:
1643                job_section_qd = 1
1644            iodepth = "iodepth=%s" % job_section_qd
1645
1646            offset_section = ""
1647            if offset:
1648                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1649
1650            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1651
1652            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1653
1654        return filename_section
1655
1656    def get_nvme_subsystem_numa(self, bdev_name):
1657        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1658        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1659
1660        # Remove two last characters to get controller name instead of subsystem name
1661        nvme_ctrl = bdev_name[:-2]
1662        for bdev in bdev_conf_json_obj:
1663            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1664                return self.get_route_nic_numa(bdev["params"]["traddr"])
1665        return None
1666
1667
1668def initiators_match_subsystems(initiators, target_obj):
1669    for i in initiators:
1670        i.match_subsystems(target_obj.subsystem_info_list)
1671        if i.enable_adq:
1672            i.adq_configure_tc()
1673
1674
1675def run_single_fio_test(args,
1676                        initiators,
1677                        configs,
1678                        target_obj,
1679                        block_size,
1680                        io_depth,
1681                        rw,
1682                        fio_rw_mix_read,
1683                        fio_run_num,
1684                        fio_ramp_time,
1685                        fio_run_time):
1686
1687    for run_no in range(1, fio_run_num+1):
1688        threads = []
1689        power_daemon = None
1690        measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1691
1692        for i, cfg in zip(initiators, configs):
1693            t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1694            threads.append(t)
1695        if target_obj.enable_sar:
1696            sar_file_prefix = measurements_prefix + "_sar"
1697            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1698            threads.append(t)
1699
1700        if target_obj.enable_pcm:
1701            pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1702            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1703                                         args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1704            threads.append(pcm_cpu_t)
1705
1706        if target_obj.enable_bw:
1707            bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1708            t = threading.Thread(target=target_obj.measure_network_bandwidth,
1709                                 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1710            threads.append(t)
1711
1712        if target_obj.enable_dpdk_memory:
1713            dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1714            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1715            threads.append(t)
1716
1717        if target_obj.enable_pm:
1718            power_daemon = threading.Thread(target=target_obj.measure_power,
1719                                            args=(args.results, measurements_prefix, script_full_dir,
1720                                                  fio_ramp_time, fio_run_time))
1721            threads.append(power_daemon)
1722
1723        for t in threads:
1724            t.start()
1725        for t in threads:
1726            t.join()
1727
1728
1729def run_fio_tests(args, initiators, target_obj,
1730                  fio_workloads, fio_rw_mix_read,
1731                  fio_run_num, fio_ramp_time, fio_run_time,
1732                  fio_rate_iops, fio_offset, fio_offset_inc):
1733
1734    for block_size, io_depth, rw in fio_workloads:
1735        configs = []
1736        for i in initiators:
1737            i.init_connect()
1738            cfg = i.gen_fio_config(rw,
1739                                   fio_rw_mix_read,
1740                                   block_size,
1741                                   io_depth,
1742                                   target_obj.subsys_no,
1743                                   fio_num_jobs,
1744                                   fio_ramp_time,
1745                                   fio_run_time,
1746                                   fio_rate_iops,
1747                                   fio_offset,
1748                                   fio_offset_inc)
1749            configs.append(cfg)
1750
1751        run_single_fio_test(args,
1752                            initiators,
1753                            configs,
1754                            target_obj,
1755                            block_size,
1756                            io_depth,
1757                            rw,
1758                            fio_rw_mix_read,
1759                            fio_run_num,
1760                            fio_ramp_time,
1761                            fio_run_time)
1762
1763        for i in initiators:
1764            i.init_disconnect()
1765            i.copy_result_files(args.results)
1766
1767    try:
1768        parse_results(args.results, args.csv_filename)
1769    except Exception as err:
1770        logging.error("There was an error with parsing the results")
1771        raise err
1772
1773
1774if __name__ == "__main__":
1775    exit_code = 0
1776
1777    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1778    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1779
1780    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1781    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1782                        help='Configuration file.')
1783    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1784                        help='Results directory.')
1785    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1786                        help='CSV results filename.')
1787    parser.add_argument('-f', '--force', default=False, action='store_true',
1788                        dest='force', help="""Force script to continue and try to use all
1789                        available NVMe devices during test.
1790                        WARNING: Might result in data loss on used NVMe drives""")
1791
1792    args = parser.parse_args()
1793
1794    logging.basicConfig(level=logging.INFO,
1795                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1796
1797    logging.info("Using config file: %s" % args.config)
1798    with open(args.config, "r") as config:
1799        data = json.load(config)
1800
1801    initiators = []
1802    general_config = data["general"]
1803    target_config = data["target"]
1804    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1805
1806    if "null_block_devices" not in data["target"] and \
1807        (args.force is False and
1808            "allowlist" not in data["target"] and
1809            "blocklist" not in data["target"]):
1810        # TODO: Also check if allowlist or blocklist are not empty.
1811        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1812        You can choose to use all available NVMe drives on your system, which may potentially
1813        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1814        exit(1)
1815
1816    for k, v in data.items():
1817        if "target" in k:
1818            v.update({"results_dir": args.results})
1819            if data[k]["mode"] == "spdk":
1820                target_obj = SPDKTarget(k, data["general"], v)
1821            elif data[k]["mode"] == "kernel":
1822                target_obj = KernelTarget(k, data["general"], v)
1823        elif "initiator" in k:
1824            if data[k]["mode"] == "spdk":
1825                init_obj = SPDKInitiator(k, data["general"], v)
1826            elif data[k]["mode"] == "kernel":
1827                init_obj = KernelInitiator(k, data["general"], v)
1828            initiators.append(init_obj)
1829        elif "fio" in k:
1830            fio_workloads = itertools.product(data[k]["bs"],
1831                                              data[k]["qd"],
1832                                              data[k]["rw"])
1833
1834            fio_run_time = data[k]["run_time"]
1835            fio_ramp_time = data[k]["ramp_time"]
1836            fio_rw_mix_read = data[k]["rwmixread"]
1837            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1838            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1839
1840            fio_rate_iops = 0
1841            if "rate_iops" in data[k]:
1842                fio_rate_iops = data[k]["rate_iops"]
1843
1844            fio_offset = False
1845            if "offset" in data[k]:
1846                fio_offset = data[k]["offset"]
1847            fio_offset_inc = 0
1848            if "offset_inc" in data[k]:
1849                fio_offset_inc = data[k]["offset_inc"]
1850        else:
1851            continue
1852
1853    try:
1854        os.mkdir(args.results)
1855    except FileExistsError:
1856        pass
1857
1858    for i in initiators:
1859        target_obj.initiator_info.append(
1860            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1861        )
1862
1863    try:
1864        target_obj.tgt_start()
1865        initiators_match_subsystems(initiators, target_obj)
1866
1867        # Poor mans threading
1868        # Run FIO tests
1869        run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read,
1870                      fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops,
1871                      fio_offset, fio_offset_inc)
1872
1873    except Exception as e:
1874        logging.error("Exception occurred while running FIO tests")
1875        logging.error(e)
1876        exit_code = 1
1877
1878    finally:
1879        for i in initiators:
1880            try:
1881                i.stop()
1882            except Exception as err:
1883                pass
1884        target_obj.stop()
1885        sys.exit(exit_code)
1886