xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 2c01ded3719229bbdc5fd0fe93fe858b36cba96b)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False),
57            ConfigField(name='pause_frames', default=None)
58        ]
59
60        self.read_config(config_fields, general_config)
61        self.transport = self.transport.lower()
62
63        config_fields = [
64            ConfigField(name='nic_ips', required=True),
65            ConfigField(name='mode', required=True),
66            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
67            ConfigField(name='enable_arfs', default=False),
68            ConfigField(name='tuned_profile', default='')
69        ]
70        self.read_config(config_fields, server_config)
71
72        self.local_nic_info = []
73        self._nics_json_obj = {}
74        self.svc_restore_dict = {}
75        self.sysctl_restore_dict = {}
76        self.tuned_restore_dict = {}
77        self.governor_restore = ""
78
79        self.enable_adq = server_config.get('adq_enable', False)
80        self.adq_priority = 1 if self.enable_adq else None
81        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
82
83        if not re.match(r'^[A-Za-z0-9\-]+$', name):
84            self.log.info("Please use a name which contains only letters, numbers or dashes")
85            sys.exit(1)
86
87    def read_config(self, config_fields, config):
88        for config_field in config_fields:
89            value = config.get(config_field.name, config_field.default)
90            if config_field.required is True and value is None:
91                raise ValueError('%s config key is required' % config_field.name)
92
93            setattr(self, config_field.name, value)
94
95    @staticmethod
96    def get_uncommented_lines(lines):
97        return [line for line in lines if line and not line.startswith('#')]
98
99    def get_nic_name_by_ip(self, ip):
100        if not self._nics_json_obj:
101            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
102            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
103        for nic in self._nics_json_obj:
104            for addr in nic["addr_info"]:
105                if ip in addr["local"]:
106                    return nic["ifname"]
107
108    @abstractmethod
109    def set_local_nic_info_helper(self):
110        pass
111
112    def set_local_nic_info(self, pci_info):
113        def extract_network_elements(json_obj):
114            nic_list = []
115            if isinstance(json_obj, list):
116                for x in json_obj:
117                    nic_list.extend(extract_network_elements(x))
118            elif isinstance(json_obj, dict):
119                if "children" in json_obj:
120                    nic_list.extend(extract_network_elements(json_obj["children"]))
121                if "class" in json_obj.keys() and "network" in json_obj["class"]:
122                    nic_list.append(json_obj)
123            return nic_list
124
125        self.local_nic_info = extract_network_elements(pci_info)
126
127    def get_nic_numa_node(self, nic_name):
128        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
129
130    def get_numa_cpu_map(self):
131        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
132        numa_cpu_json_map = {}
133
134        for cpu in numa_cpu_json_obj["cpus"]:
135            cpu_num = int(cpu["cpu"])
136            numa_node = int(cpu["node"])
137            numa_cpu_json_map.setdefault(numa_node, [])
138            numa_cpu_json_map[numa_node].append(cpu_num)
139
140        return numa_cpu_json_map
141
142    # pylint: disable=R0201
143    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
144        return ""
145
146    def configure_system(self):
147        self.load_drivers()
148        self.configure_services()
149        self.configure_sysctl()
150        self.configure_arfs()
151        self.configure_tuned()
152        self.configure_cpu_governor()
153        self.configure_irq_affinity(**self.irq_settings)
154        self.configure_pause_frames()
155
156    RDMA_PROTOCOL_IWARP = 0
157    RDMA_PROTOCOL_ROCE = 1
158    RDMA_PROTOCOL_UNKNOWN = -1
159
160    def check_rdma_protocol(self):
161        try:
162            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
163            roce_ena = roce_ena.strip()
164            if roce_ena == "0":
165                return self.RDMA_PROTOCOL_IWARP
166            else:
167                return self.RDMA_PROTOCOL_ROCE
168        except CalledProcessError as e:
169            self.log.error("ERROR: failed to check RDMA protocol!")
170            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
171            return self.RDMA_PROTOCOL_UNKNOWN
172
173    def load_drivers(self):
174        self.log.info("Loading drivers")
175        self.exec_cmd(["sudo", "modprobe", "-a",
176                       "nvme-%s" % self.transport,
177                       "nvmet-%s" % self.transport])
178        current_mode = self.check_rdma_protocol()
179        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
180            self.log.error("ERROR: failed to check RDMA protocol mode")
181            return
182        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
183            self.reload_driver("irdma", "roce_ena=1")
184            self.log.info("Loaded irdma driver with RoCE enabled")
185        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
186            self.log.info("Leaving irdma driver with RoCE enabled")
187        else:
188            self.reload_driver("irdma", "roce_ena=0")
189            self.log.info("Loaded irdma driver with iWARP enabled")
190
191    def set_pause_frames(self, rx_state, tx_state):
192        for nic_ip in self.nic_ips:
193            nic_name = self.get_nic_name_by_ip(nic_ip)
194            self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state])
195
196    def configure_pause_frames(self):
197        if self.pause_frames is None:
198            self.log.info("Keeping NIC's default pause frames setting")
199            return
200        elif not self.pause_frames:
201            self.log.info("Turning off pause frames")
202            self.set_pause_frames("off", "off")
203        else:
204            self.log.info("Turning on pause frames")
205            self.set_pause_frames("on", "on")
206
207    def configure_arfs(self):
208        rps_flow_cnt = 512
209        if not self.enable_arfs:
210            rps_flow_cnt = 0
211
212        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
213        for nic_name in nic_names:
214            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
215            self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}")
216            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
217            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
218
219            for qf in queue_files:
220                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
221
222    def configure_adq(self):
223        self.adq_load_modules()
224        self.adq_configure_nic()
225
226    def adq_load_modules(self):
227        self.log.info("Modprobing ADQ-related Linux modules...")
228        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
229        for module in adq_module_deps:
230            try:
231                self.exec_cmd(["sudo", "modprobe", module])
232                self.log.info("%s loaded!" % module)
233            except CalledProcessError as e:
234                self.log.error("ERROR: failed to load module %s" % module)
235                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
236
237    def adq_configure_tc(self):
238        self.log.info("Configuring ADQ Traffic classes and filters...")
239
240        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
241        num_queues_tc1 = self.num_cores
242        port_param = "dst_port" if isinstance(self, Target) else "src_port"
243        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
244
245        for nic_ip in self.nic_ips:
246            nic_name = self.get_nic_name_by_ip(nic_ip)
247            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
248
249            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
250                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
251                                "queues", "%s@0" % num_queues_tc0,
252                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
253                                "hw", "1", "mode", "channel"]
254            self.log.info(" ".join(tc_qdisc_map_cmd))
255            self.exec_cmd(tc_qdisc_map_cmd)
256
257            time.sleep(5)
258            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
259            self.log.info(" ".join(tc_qdisc_ingress_cmd))
260            self.exec_cmd(tc_qdisc_ingress_cmd)
261
262            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
263            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
264                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
265
266            for port in nic_ports:
267                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
268                                 "protocol", "ip", "ingress", "prio", "1", "flower",
269                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
270                                 "skip_sw", "hw_tc", "1"]
271                self.log.info(" ".join(tc_filter_cmd))
272                self.exec_cmd(tc_filter_cmd)
273
274            # show tc configuration
275            self.log.info("Show tc configuration for %s NIC..." % nic_name)
276            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
277            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
278            self.log.info("%s" % tc_disk_out)
279            self.log.info("%s" % tc_filter_out)
280
281            # Ethtool coalesce settings must be applied after configuring traffic classes
282            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
283            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
284
285            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
286            xps_cmd = ["sudo", xps_script_path, nic_name]
287            self.log.info(xps_cmd)
288            self.exec_cmd(xps_cmd)
289
290    def reload_driver(self, driver, *modprobe_args):
291
292        try:
293            self.exec_cmd(["sudo", "rmmod", driver])
294            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
295        except CalledProcessError as e:
296            self.log.error("ERROR: failed to reload %s module!" % driver)
297            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
298
299    def adq_configure_nic(self):
300        self.log.info("Configuring NIC port settings for ADQ testing...")
301
302        # Reload the driver first, to make sure any previous settings are re-set.
303        self.reload_driver("ice")
304
305        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
306        for nic in nic_names:
307            self.log.info(nic)
308            try:
309                self.exec_cmd(["sudo", "ethtool", "-K", nic,
310                               "hw-tc-offload", "on"])  # Enable hardware TC offload
311                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
312                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
313                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
314                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
315                # but can be used with 4k block sizes as well
316                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
317                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
318            except subprocess.CalledProcessError as e:
319                self.log.error("ERROR: failed to configure NIC port using ethtool!")
320                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
321                self.log.info("Please update your NIC driver and firmware versions and try again.")
322            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
323            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
324
325    def configure_services(self):
326        self.log.info("Configuring active services...")
327        svc_config = configparser.ConfigParser(strict=False)
328
329        # Below list is valid only for RHEL / Fedora systems and might not
330        # contain valid names for other distributions.
331        svc_target_state = {
332            "firewalld": "inactive",
333            "irqbalance": "inactive",
334            "lldpad.service": "inactive",
335            "lldpad.socket": "inactive"
336        }
337
338        for service in svc_target_state:
339            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
340            out = "\n".join(["[%s]" % service, out])
341            svc_config.read_string(out)
342
343            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
344                continue
345
346            service_state = svc_config[service]["ActiveState"]
347            self.log.info("Current state of %s service is %s" % (service, service_state))
348            self.svc_restore_dict.update({service: service_state})
349            if service_state != "inactive":
350                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
351                self.exec_cmd(["sudo", "systemctl", "stop", service])
352
353    def configure_sysctl(self):
354        self.log.info("Tuning sysctl settings...")
355
356        busy_read = 0
357        busy_poll = 0
358        if self.enable_adq and self.mode == "spdk":
359            busy_read = 1
360            busy_poll = 1
361
362        sysctl_opts = {
363            "net.core.busy_poll": busy_poll,
364            "net.core.busy_read": busy_read,
365            "net.core.somaxconn": 4096,
366            "net.core.netdev_max_backlog": 8192,
367            "net.ipv4.tcp_max_syn_backlog": 16384,
368            "net.core.rmem_max": 268435456,
369            "net.core.wmem_max": 268435456,
370            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
371            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
372            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
373            "net.ipv4.route.flush": 1,
374            "vm.overcommit_memory": 1,
375            "net.core.rps_sock_flow_entries": 0
376        }
377
378        if self.enable_adq:
379            sysctl_opts.update(self.adq_set_busy_read(1))
380
381        if self.enable_arfs:
382            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
383
384        for opt, value in sysctl_opts.items():
385            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
386            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
387
388    def configure_tuned(self):
389        if not self.tuned_profile:
390            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
391            return
392
393        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
394        service = "tuned"
395        tuned_config = configparser.ConfigParser(strict=False)
396
397        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
398        out = "\n".join(["[%s]" % service, out])
399        tuned_config.read_string(out)
400        tuned_state = tuned_config[service]["ActiveState"]
401        self.svc_restore_dict.update({service: tuned_state})
402
403        if tuned_state != "inactive":
404            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
405            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
406
407            self.tuned_restore_dict = {
408                "profile": profile,
409                "mode": profile_mode
410            }
411
412        self.exec_cmd(["sudo", "systemctl", "start", service])
413        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
414        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
415
416    def configure_cpu_governor(self):
417        self.log.info("Setting CPU governor to performance...")
418
419        # This assumes that there is the same CPU scaling governor on each CPU
420        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
421        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
422
423    def get_core_list_from_mask(self, core_mask):
424        # Generate list of individual cores from hex core mask
425        # (e.g. '0xffff') or list containing:
426        # - individual cores (e.g. '1, 2, 3')
427        # - core ranges (e.g. '0-3')
428        # - mix of both (e.g. '0, 1-4, 9, 11-13')
429
430        core_list = []
431        if "0x" in core_mask:
432            core_mask_int = int(core_mask, 16)
433            for i in range(core_mask_int.bit_length()):
434                if (1 << i) & core_mask_int:
435                    core_list.append(i)
436            return core_list
437        else:
438            # Core list can be provided in .json config with square brackets
439            # remove them first
440            core_mask = core_mask.replace("[", "")
441            core_mask = core_mask.replace("]", "")
442
443            for i in core_mask.split(","):
444                if "-" in i:
445                    start, end = i.split("-")
446                    core_range = range(int(start), int(end) + 1)
447                    core_list.extend(core_range)
448                else:
449                    core_list.append(int(i))
450            return core_list
451
452    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
453        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
454
455        if mode not in ["default", "bynode", "cpulist"]:
456            raise ValueError("%s irq affinity setting not supported" % mode)
457
458        if mode == "cpulist" and not cpulist:
459            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
460
461        affinity_script = "set_irq_affinity.sh"
462        if "default" not in mode:
463            affinity_script = "set_irq_affinity_cpulist.sh"
464            system_cpu_map = self.get_numa_cpu_map()
465        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
466
467        def cpu_list_to_string(cpulist):
468            return ",".join(map(lambda x: str(x), cpulist))
469
470        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
471        for nic_name in nic_names:
472            irq_cmd = ["sudo", irq_script_path]
473
474            # Use only CPU cores matching NIC NUMA node.
475            # Remove any CPU cores if they're on exclusion list.
476            if mode == "bynode":
477                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
478                if cpulist and exclude_cpulist:
479                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
480                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
481                    if not irq_cpus:
482                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
483                irq_cmd.append(cpu_list_to_string(irq_cpus))
484
485            if mode == "cpulist":
486                irq_cpus = self.get_core_list_from_mask(cpulist)
487                if exclude_cpulist:
488                    # Flatten system CPU list, we don't need NUMA awareness here
489                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
490                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
491                    if not irq_cpus:
492                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
493                irq_cmd.append(cpu_list_to_string(irq_cpus))
494
495            irq_cmd.append(nic_name)
496            self.log.info(irq_cmd)
497            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
498
499    def restore_services(self):
500        self.log.info("Restoring services...")
501        for service, state in self.svc_restore_dict.items():
502            cmd = "stop" if state == "inactive" else "start"
503            self.exec_cmd(["sudo", "systemctl", cmd, service])
504
505    def restore_sysctl(self):
506        self.log.info("Restoring sysctl settings...")
507        for opt, value in self.sysctl_restore_dict.items():
508            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
509
510    def restore_tuned(self):
511        self.log.info("Restoring tuned-adm settings...")
512
513        if not self.tuned_restore_dict:
514            return
515
516        if self.tuned_restore_dict["mode"] == "auto":
517            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
518            self.log.info("Reverted tuned-adm to auto_profile.")
519        else:
520            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
521            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
522
523    def restore_governor(self):
524        self.log.info("Restoring CPU governor setting...")
525        if self.governor_restore:
526            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
527            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
528
529    def restore_settings(self):
530        self.restore_governor()
531        self.restore_tuned()
532        self.restore_services()
533        self.restore_sysctl()
534        if self.enable_adq:
535            self.reload_driver("ice")
536
537
538class Target(Server):
539    def __init__(self, name, general_config, target_config):
540        super().__init__(name, general_config, target_config)
541
542        # Defaults
543        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
544        self.subsystem_info_list = []
545        self.initiator_info = []
546
547        config_fields = [
548            ConfigField(name='mode', required=True),
549            ConfigField(name='results_dir', required=True),
550            ConfigField(name='enable_pm', default=True),
551            ConfigField(name='enable_sar', default=True),
552            ConfigField(name='enable_pcm', default=True),
553            ConfigField(name='enable_dpdk_memory', default=True)
554        ]
555        self.read_config(config_fields, target_config)
556
557        self.null_block = target_config.get('null_block_devices', 0)
558        self.scheduler_name = target_config.get('scheduler_settings', 'static')
559        self.enable_zcopy = target_config.get('zcopy_settings', False)
560        self.enable_bw = target_config.get('enable_bandwidth', True)
561        self.nvme_blocklist = target_config.get('blocklist', [])
562        self.nvme_allowlist = target_config.get('allowlist', [])
563
564        # Blocklist takes precedence, remove common elements from allowlist
565        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
566
567        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
568        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
569
570        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
571        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
572        self.set_local_nic_info(self.set_local_nic_info_helper())
573
574        if self.skip_spdk_install is False:
575            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
576
577        self.configure_system()
578        if self.enable_adq:
579            self.configure_adq()
580            self.configure_irq_affinity()
581        self.sys_config()
582
583    def set_local_nic_info_helper(self):
584        return json.loads(self.exec_cmd(["lshw", "-json"]))
585
586    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
587        stderr_opt = None
588        if stderr_redirect:
589            stderr_opt = subprocess.STDOUT
590        if change_dir:
591            old_cwd = os.getcwd()
592            os.chdir(change_dir)
593            self.log.info("Changing directory to %s" % change_dir)
594
595        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
596
597        if change_dir:
598            os.chdir(old_cwd)
599            self.log.info("Changing directory to %s" % old_cwd)
600        return out
601
602    def zip_spdk_sources(self, spdk_dir, dest_file):
603        self.log.info("Zipping SPDK source directory")
604        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
605        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
606            for file in files:
607                fh.write(os.path.relpath(os.path.join(root, file)))
608        fh.close()
609        self.log.info("Done zipping")
610
611    @staticmethod
612    def _chunks(input_list, chunks_no):
613        div, rem = divmod(len(input_list), chunks_no)
614        for i in range(chunks_no):
615            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
616            yield input_list[si:si + (div + 1 if i < rem else div)]
617
618    def spread_bdevs(self, req_disks):
619        # Spread available block devices indexes:
620        # - evenly across available initiator systems
621        # - evenly across available NIC interfaces for
622        #   each initiator
623        # Not NUMA aware.
624        ip_bdev_map = []
625        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
626
627        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
628            self.initiator_info[i]["bdev_range"] = init_chunk
629            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
630            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
631                for c in nic_chunk:
632                    ip_bdev_map.append((ip, c))
633        return ip_bdev_map
634
635    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
636        cpu_number = os.cpu_count()
637        sar_idle_sum = 0
638        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
639        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
640
641        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
642        time.sleep(ramp_time)
643        self.log.info("Starting SAR measurements")
644
645        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
646        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
647            for line in out.split("\n"):
648                if "Average" in line:
649                    if "CPU" in line:
650                        self.log.info("Summary CPU utilization from SAR:")
651                        self.log.info(line)
652                    elif "all" in line:
653                        self.log.info(line)
654                    else:
655                        sar_idle_sum += float(line.split()[7])
656            fh.write(out)
657        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
658
659        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
660            f.write("%0.2f" % sar_cpu_usage)
661
662    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
663        time.sleep(ramp_time)
664        self.log.info("Starting power measurements")
665        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
666                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
667                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
668
669    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
670        time.sleep(ramp_time)
671        cmd = ["pcm", "1", "-i=%s" % run_time,
672               "-csv=%s/%s" % (results_dir, pcm_file_name)]
673        subprocess.run(cmd)
674        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
675        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
676        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
677        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
678        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
679
680    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
681        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
682        time.sleep(ramp_time)
683        self.log.info("INFO: starting network bandwidth measure")
684        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
685                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
686        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
687        #       Improve this so that additional file containing measurements average is generated too.
688        # TODO: Monitor only these interfaces which are currently used to run the workload.
689
690    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
691        self.log.info("INFO: waiting to generate DPDK memory usage")
692        time.sleep(ramp_time)
693        self.log.info("INFO: generating DPDK memory usage")
694        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
695        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
696
697    def sys_config(self):
698        self.log.info("====Kernel release:====")
699        self.log.info(os.uname().release)
700        self.log.info("====Kernel command line:====")
701        with open('/proc/cmdline') as f:
702            cmdline = f.readlines()
703            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
704        self.log.info("====sysctl conf:====")
705        with open('/etc/sysctl.conf') as f:
706            sysctl = f.readlines()
707            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
708        self.log.info("====Cpu power info:====")
709        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
710        self.log.info("====zcopy settings:====")
711        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
712        self.log.info("====Scheduler settings:====")
713        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
714
715
716class Initiator(Server):
717    def __init__(self, name, general_config, initiator_config):
718        super().__init__(name, general_config, initiator_config)
719
720        # required fields and defaults
721        config_fields = [
722            ConfigField(name='mode', required=True),
723            ConfigField(name='ip', required=True),
724            ConfigField(name='target_nic_ips', required=True),
725            ConfigField(name='cpus_allowed', default=None),
726            ConfigField(name='cpus_allowed_policy', default='shared'),
727            ConfigField(name='spdk_dir', default='/tmp/spdk'),
728            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
729            ConfigField(name='nvmecli_bin', default='nvme'),
730            ConfigField(name='cpu_frequency', default=None),
731            ConfigField(name='allow_cpu_sharing', default=True)
732        ]
733
734        self.read_config(config_fields, initiator_config)
735
736        if os.getenv('SPDK_WORKSPACE'):
737            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
738
739        self.subsystem_info_list = []
740
741        self.ssh_connection = paramiko.SSHClient()
742        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
743        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
744        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
745        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
746        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
747
748        if self.skip_spdk_install is False:
749            self.copy_spdk("/tmp/spdk.zip")
750
751        self.set_local_nic_info(self.set_local_nic_info_helper())
752        self.set_cpu_frequency()
753        self.configure_system()
754        if self.enable_adq:
755            self.configure_adq()
756        self.sys_config()
757
758    def set_local_nic_info_helper(self):
759        return json.loads(self.exec_cmd(["lshw", "-json"]))
760
761    def stop(self):
762        self.restore_settings()
763        self.ssh_connection.close()
764
765    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
766        if change_dir:
767            cmd = ["cd", change_dir, ";", *cmd]
768
769        # In case one of the command elements contains whitespace and is not
770        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
771        # when sending to remote system.
772        for i, c in enumerate(cmd):
773            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
774                cmd[i] = '"%s"' % c
775        cmd = " ".join(cmd)
776
777        # Redirect stderr to stdout thanks using get_pty option if needed
778        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
779        out = stdout.read().decode(encoding="utf-8")
780
781        # Check the return code
782        rc = stdout.channel.recv_exit_status()
783        if rc:
784            raise CalledProcessError(int(rc), cmd, out)
785
786        return out
787
788    def put_file(self, local, remote_dest):
789        ftp = self.ssh_connection.open_sftp()
790        ftp.put(local, remote_dest)
791        ftp.close()
792
793    def get_file(self, remote, local_dest):
794        ftp = self.ssh_connection.open_sftp()
795        ftp.get(remote, local_dest)
796        ftp.close()
797
798    def copy_spdk(self, local_spdk_zip):
799        self.log.info("Copying SPDK sources to initiator %s" % self.name)
800        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
801        self.log.info("Copied sources zip from target")
802        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
803        self.log.info("Sources unpacked")
804
805    def copy_result_files(self, dest_dir):
806        self.log.info("Copying results")
807
808        if not os.path.exists(dest_dir):
809            os.mkdir(dest_dir)
810
811        # Get list of result files from initiator and copy them back to target
812        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
813
814        for file in file_list:
815            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
816                          os.path.join(dest_dir, file))
817        self.log.info("Done copying results")
818
819    def match_subsystems(self, target_subsystems):
820        subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips]
821        if not subsystems:
822            raise Exception("No matching subsystems found on target side!")
823        subsystems.sort(key=lambda x: x[1])
824        self.log.info("Found matching subsystems on target side:")
825        for s in subsystems:
826            self.log.info(s)
827        self.subsystem_info_list = subsystems
828
829    @abstractmethod
830    def init_connect(self):
831        pass
832
833    @abstractmethod
834    def init_disconnect(self):
835        pass
836
837    @abstractmethod
838    def gen_fio_filename_conf(self, *args, **kwargs):
839        # Logic implemented in SPDKInitiator and KernelInitiator classes
840        pass
841
842    def get_route_nic_numa(self, remote_nvme_ip):
843        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
844        local_nvme_nic = local_nvme_nic[0]["dev"]
845        return self.get_nic_numa_node(local_nvme_nic)
846
847    @staticmethod
848    def gen_fio_offset_section(offset_inc, num_jobs):
849        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
850        return "\n".join(["size=%s%%" % offset_inc,
851                          "offset=0%",
852                          "offset_increment=%s%%" % offset_inc])
853
854    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
855        numa_stats = {}
856        allowed_cpus = []
857        for nvme in fio_filenames_list:
858            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
859            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
860
861        # Use the most common NUMA node for this chunk to allocate memory and CPUs
862        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
863
864        # Check if we have enough free CPUs to pop from the list before assigning them
865        if len(self.available_cpus[section_local_numa]) < num_jobs:
866            if self.allow_cpu_sharing:
867                self.log.info("Regenerating available CPU list %s" % section_local_numa)
868                # Remove still available CPUs from the regenerated list. We don't want to
869                # regenerate it with duplicates.
870                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
871                self.available_cpus[section_local_numa].extend(cpus_regen)
872                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
873            else:
874                self.log.error("No more free CPU cores to use from allowed_cpus list!")
875                raise IndexError
876
877        for _ in range(num_jobs):
878            try:
879                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
880            except IndexError:
881                self.log.error("No more free CPU cores to use from allowed_cpus list!")
882                raise
883
884        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
885                          "numa_mem_policy=prefer:%s" % section_local_numa])
886
887    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
888                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
889                       offset=False, offset_inc=0):
890        fio_conf_template = """
891[global]
892ioengine={ioengine}
893{spdk_conf}
894thread=1
895group_reporting=1
896direct=1
897percentile_list=50:90:99:99.5:99.9:99.99:99.999
898
899norandommap=1
900rw={rw}
901rwmixread={rwmixread}
902bs={block_size}
903time_based=1
904ramp_time={ramp_time}
905runtime={run_time}
906rate_iops={rate_iops}
907"""
908
909        if self.cpus_allowed is not None:
910            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
911            cpus_num = 0
912            cpus = self.cpus_allowed.split(",")
913            for cpu in cpus:
914                if "-" in cpu:
915                    a, b = cpu.split("-")
916                    a = int(a)
917                    b = int(b)
918                    cpus_num += len(range(a, b))
919                else:
920                    cpus_num += 1
921            self.num_cores = cpus_num
922            threads = range(0, self.num_cores)
923        elif hasattr(self, 'num_cores'):
924            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
925            threads = range(0, int(self.num_cores))
926        else:
927            self.num_cores = len(self.subsystem_info_list)
928            threads = range(0, len(self.subsystem_info_list))
929
930        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
931                                                      offset, offset_inc)
932
933        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
934                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
935                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
936
937        # TODO: hipri disabled for now, as it causes fio errors:
938        # io_u error on file /dev/nvme2n1: Operation not supported
939        # See comment in KernelInitiator class, init_connect() function
940        if "io_uring" in self.ioengine:
941            fio_config = fio_config + """
942fixedbufs=1
943registerfiles=1
944#hipri=1
945"""
946        if num_jobs:
947            fio_config = fio_config + "numjobs=%s \n" % num_jobs
948        if self.cpus_allowed is not None:
949            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
950            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
951        fio_config = fio_config + filename_section
952
953        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
954        if hasattr(self, "num_cores"):
955            fio_config_filename += "_%sCPU" % self.num_cores
956        fio_config_filename += ".fio"
957
958        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
959        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
960        self.log.info("Created FIO Config:")
961        self.log.info(fio_config)
962
963        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
964
965    def set_cpu_frequency(self):
966        if self.cpu_frequency is not None:
967            try:
968                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
969                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
970                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
971            except Exception:
972                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
973                sys.exit(1)
974        else:
975            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
976
977    def run_fio(self, fio_config_file, run_num=1):
978        job_name, _ = os.path.splitext(fio_config_file)
979        self.log.info("Starting FIO run for job: %s" % job_name)
980        self.log.info("Using FIO: %s" % self.fio_bin)
981
982        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
983        try:
984            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
985                                    "--output=%s" % output_filename, "--eta=never"], True)
986            self.log.info(output)
987            self.log.info("FIO run finished. Results in: %s" % output_filename)
988        except subprocess.CalledProcessError as e:
989            self.log.error("ERROR: Fio process failed!")
990            self.log.error(e.stdout)
991
992    def sys_config(self):
993        self.log.info("====Kernel release:====")
994        self.log.info(self.exec_cmd(["uname", "-r"]))
995        self.log.info("====Kernel command line:====")
996        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
997        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
998        self.log.info("====sysctl conf:====")
999        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
1000        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1001        self.log.info("====Cpu power info:====")
1002        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
1003
1004
1005class KernelTarget(Target):
1006    def __init__(self, name, general_config, target_config):
1007        super().__init__(name, general_config, target_config)
1008        # Defaults
1009        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1010
1011    def load_drivers(self):
1012        self.log.info("Loading drivers")
1013        super().load_drivers()
1014        if self.null_block:
1015            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1016
1017    def configure_adq(self):
1018        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1019
1020    def adq_configure_tc(self):
1021        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1022
1023    def adq_set_busy_read(self, busy_read_val):
1024        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1025        return {"net.core.busy_read": 0}
1026
1027    def stop(self):
1028        self.nvmet_command(self.nvmet_bin, "clear")
1029        self.restore_settings()
1030
1031    def get_nvme_device_bdf(self, nvme_dev_path):
1032        nvme_name = os.path.basename(nvme_dev_path)
1033        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1034
1035    def get_nvme_devices(self):
1036        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1037        nvme_list = []
1038        for dev in dev_list:
1039            if "nvme" not in dev:
1040                continue
1041            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1042                continue
1043            if len(self.nvme_allowlist) == 0:
1044                nvme_list.append(dev)
1045                continue
1046            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1047                nvme_list.append(dev)
1048        return nvme_list
1049
1050    def nvmet_command(self, nvmet_bin, command):
1051        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1052
1053    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1054
1055        nvmet_cfg = {
1056            "ports": [],
1057            "hosts": [],
1058            "subsystems": [],
1059        }
1060
1061        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1062            port = str(4420 + bdev_num)
1063            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1064            serial = "SPDK00%s" % bdev_num
1065            bdev_name = nvme_list[bdev_num]
1066
1067            nvmet_cfg["subsystems"].append({
1068                "allowed_hosts": [],
1069                "attr": {
1070                    "allow_any_host": "1",
1071                    "serial": serial,
1072                    "version": "1.3"
1073                },
1074                "namespaces": [
1075                    {
1076                        "device": {
1077                            "path": bdev_name,
1078                            "uuid": "%s" % uuid.uuid4()
1079                        },
1080                        "enable": 1,
1081                        "nsid": port
1082                    }
1083                ],
1084                "nqn": nqn
1085            })
1086
1087            nvmet_cfg["ports"].append({
1088                "addr": {
1089                    "adrfam": "ipv4",
1090                    "traddr": ip,
1091                    "trsvcid": port,
1092                    "trtype": self.transport
1093                },
1094                "portid": bdev_num,
1095                "referrals": [],
1096                "subsystems": [nqn]
1097            })
1098
1099            self.subsystem_info_list.append((port, nqn, ip))
1100        self.subsys_no = len(self.subsystem_info_list)
1101
1102        with open("kernel.conf", "w") as fh:
1103            fh.write(json.dumps(nvmet_cfg, indent=2))
1104
1105    def tgt_start(self):
1106        self.log.info("Configuring kernel NVMeOF Target")
1107
1108        if self.null_block:
1109            self.log.info("Configuring with null block device.")
1110            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1111        else:
1112            self.log.info("Configuring with NVMe drives.")
1113            nvme_list = self.get_nvme_devices()
1114
1115        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1116        self.subsys_no = len(nvme_list)
1117
1118        self.nvmet_command(self.nvmet_bin, "clear")
1119        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1120
1121        if self.enable_adq:
1122            self.adq_configure_tc()
1123
1124        self.log.info("Done configuring kernel NVMeOF Target")
1125
1126
1127class SPDKTarget(Target):
1128    def __init__(self, name, general_config, target_config):
1129        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1130        # Method setting IRQ affinity is run as part of parent classes init,
1131        # so we need to have self.core_mask set before changing IRQ affinity.
1132        self.core_mask = target_config["core_mask"]
1133        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1134
1135        super().__init__(name, general_config, target_config)
1136
1137        # Format: property, default value
1138        config_fields = [
1139            ConfigField(name='dif_insert_strip', default=False),
1140            ConfigField(name='null_block_dif_type', default=0),
1141            ConfigField(name='num_shared_buffers', default=4096),
1142            ConfigField(name='max_queue_depth', default=128),
1143            ConfigField(name='bpf_scripts', default=[]),
1144            ConfigField(name='scheduler_core_limit', default=None),
1145            ConfigField(name='dsa_settings', default=False),
1146            ConfigField(name='iobuf_small_pool_count', default=32767),
1147            ConfigField(name='iobuf_large_pool_count', default=16383),
1148            ConfigField(name='num_cqe', default=4096),
1149            ConfigField(name='sock_impl', default='posix')
1150        ]
1151
1152        self.read_config(config_fields, target_config)
1153
1154        self.bpf_proc = None
1155        self.enable_dsa = False
1156
1157        self.log.info("====DSA settings:====")
1158        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1159
1160    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1161        if mode not in ["default", "bynode", "cpulist",
1162                        "shared", "split", "split-bynode"]:
1163            self.log.error("%s irq affinity setting not supported" % mode)
1164            raise Exception
1165
1166        # Create core list from SPDK's mask and change it to string.
1167        # This is the type configure_irq_affinity expects for cpulist parameter.
1168        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1169        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1170        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1171
1172        if mode == "shared":
1173            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1174        elif mode == "split":
1175            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1176        elif mode == "split-bynode":
1177            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1178        else:
1179            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1180
1181    def adq_set_busy_read(self, busy_read_val):
1182        return {"net.core.busy_read": busy_read_val}
1183
1184    def get_nvme_devices_count(self):
1185        return len(self.get_nvme_devices())
1186
1187    def get_nvme_devices(self):
1188        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1189        bdev_bdfs = []
1190        for bdev in bdev_subsys_json_obj["config"]:
1191            bdev_traddr = bdev["params"]["traddr"]
1192            if bdev_traddr in self.nvme_blocklist:
1193                continue
1194            if len(self.nvme_allowlist) == 0:
1195                bdev_bdfs.append(bdev_traddr)
1196            if bdev_traddr in self.nvme_allowlist:
1197                bdev_bdfs.append(bdev_traddr)
1198        return bdev_bdfs
1199
1200    def spdk_tgt_configure(self):
1201        self.log.info("Configuring SPDK NVMeOF target via RPC")
1202
1203        # Create transport layer
1204        nvmf_transport_params = {
1205            "client": self.client,
1206            "trtype": self.transport,
1207            "num_shared_buffers": self.num_shared_buffers,
1208            "max_queue_depth": self.max_queue_depth,
1209            "dif_insert_or_strip": self.dif_insert_strip,
1210            "sock_priority": self.adq_priority,
1211            "num_cqe": self.num_cqe
1212        }
1213
1214        if self.enable_adq:
1215            nvmf_transport_params["acceptor_poll_rate"] = 10000
1216
1217        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1218        self.log.info("SPDK NVMeOF transport layer:")
1219        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1220
1221        if self.null_block:
1222            self.spdk_tgt_add_nullblock(self.null_block)
1223            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1224        else:
1225            self.spdk_tgt_add_nvme_conf()
1226            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1227
1228        if self.enable_adq:
1229            self.adq_configure_tc()
1230
1231        self.log.info("Done configuring SPDK NVMeOF Target")
1232
1233    def spdk_tgt_add_nullblock(self, null_block_count):
1234        md_size = 0
1235        block_size = 4096
1236        if self.null_block_dif_type != 0:
1237            md_size = 128
1238
1239        self.log.info("Adding null block bdevices to config via RPC")
1240        for i in range(null_block_count):
1241            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1242            rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i),
1243                                      dif_type=self.null_block_dif_type, md_size=md_size)
1244        self.log.info("SPDK Bdevs configuration:")
1245        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1246
1247    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1248        self.log.info("Adding NVMe bdevs to config via RPC")
1249
1250        bdfs = self.get_nvme_devices()
1251        bdfs = [b.replace(":", ".") for b in bdfs]
1252
1253        if req_num_disks:
1254            if req_num_disks > len(bdfs):
1255                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1256                sys.exit(1)
1257            else:
1258                bdfs = bdfs[0:req_num_disks]
1259
1260        for i, bdf in enumerate(bdfs):
1261            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1262
1263        self.log.info("SPDK Bdevs configuration:")
1264        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1265
1266    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1267        self.log.info("Adding subsystems to config")
1268        if not req_num_disks:
1269            req_num_disks = self.get_nvme_devices_count()
1270
1271        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1272            port = str(4420 + bdev_num)
1273            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1274            serial = "SPDK00%s" % bdev_num
1275            bdev_name = "Nvme%sn1" % bdev_num
1276
1277            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1278                                           allow_any_host=True, max_namespaces=8)
1279            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1280            for nqn_name in [nqn, "discovery"]:
1281                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1282                                                     nqn=nqn_name,
1283                                                     trtype=self.transport,
1284                                                     traddr=ip,
1285                                                     trsvcid=port,
1286                                                     adrfam="ipv4")
1287            self.subsystem_info_list.append((port, nqn, ip))
1288        self.subsys_no = len(self.subsystem_info_list)
1289
1290        self.log.info("SPDK NVMeOF subsystem configuration:")
1291        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1292
1293    def bpf_start(self):
1294        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1295        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1296        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1297        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1298
1299        with open(self.pid, "r") as fh:
1300            nvmf_pid = str(fh.readline())
1301
1302        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1303        self.log.info(cmd)
1304        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1305
1306    def tgt_start(self):
1307        if self.null_block:
1308            self.subsys_no = 1
1309        else:
1310            self.subsys_no = self.get_nvme_devices_count()
1311        self.log.info("Starting SPDK NVMeOF Target process")
1312        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1313        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1314        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1315
1316        with open(self.pid, "w") as fh:
1317            fh.write(str(proc.pid))
1318        self.nvmf_proc = proc
1319        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1320        self.log.info("Waiting for spdk to initialize...")
1321        while True:
1322            if os.path.exists("/var/tmp/spdk.sock"):
1323                break
1324            time.sleep(1)
1325        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1326
1327        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1328        rpc.iobuf.iobuf_set_options(self.client,
1329                                    small_pool_count=self.iobuf_small_pool_count,
1330                                    large_pool_count=self.iobuf_large_pool_count,
1331                                    small_bufsize=None,
1332                                    large_bufsize=None)
1333
1334        if self.enable_zcopy:
1335            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1336                                           enable_zerocopy_send_server=True)
1337            self.log.info("Target socket options:")
1338            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1339
1340        if self.enable_adq:
1341            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1342            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1343                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1344
1345        if self.enable_dsa:
1346            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1347            self.log.info("Target DSA accel module enabled")
1348
1349        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1350        rpc.framework_start_init(self.client)
1351
1352        if self.bpf_scripts:
1353            self.bpf_start()
1354
1355        self.spdk_tgt_configure()
1356
1357    def stop(self):
1358        if self.bpf_proc:
1359            self.log.info("Stopping BPF Trace script")
1360            self.bpf_proc.terminate()
1361            self.bpf_proc.wait()
1362
1363        if hasattr(self, "nvmf_proc"):
1364            try:
1365                self.nvmf_proc.terminate()
1366                self.nvmf_proc.wait(timeout=30)
1367            except Exception as e:
1368                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1369                self.log.info(e)
1370                self.nvmf_proc.kill()
1371                self.nvmf_proc.communicate()
1372                # Try to clean up RPC socket files if they were not removed
1373                # because of using 'kill'
1374                try:
1375                    os.remove("/var/tmp/spdk.sock")
1376                    os.remove("/var/tmp/spdk.sock.lock")
1377                except FileNotFoundError:
1378                    pass
1379        self.restore_settings()
1380
1381
1382class KernelInitiator(Initiator):
1383    def __init__(self, name, general_config, initiator_config):
1384        super().__init__(name, general_config, initiator_config)
1385
1386        # Defaults
1387        self.extra_params = initiator_config.get('extra_params', '')
1388
1389        self.ioengine = "libaio"
1390        self.spdk_conf = ""
1391
1392        if "num_cores" in initiator_config:
1393            self.num_cores = initiator_config["num_cores"]
1394
1395        if "kernel_engine" in initiator_config:
1396            self.ioengine = initiator_config["kernel_engine"]
1397            if "io_uring" in self.ioengine:
1398                self.extra_params += ' --nr-poll-queues=8'
1399
1400    def configure_adq(self):
1401        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1402
1403    def adq_configure_tc(self):
1404        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1405
1406    def adq_set_busy_read(self, busy_read_val):
1407        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1408        return {"net.core.busy_read": 0}
1409
1410    def get_connected_nvme_list(self):
1411        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1412        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1413                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1414        return nvme_list
1415
1416    def init_connect(self):
1417        self.log.info("Below connection attempts may result in error messages, this is expected!")
1418        for subsystem in self.subsystem_info_list:
1419            self.log.info("Trying to connect %s %s %s" % subsystem)
1420            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1421                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1422            time.sleep(2)
1423
1424        if "io_uring" in self.ioengine:
1425            self.log.info("Setting block layer settings for io_uring.")
1426
1427            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1428            #       apparently it's not possible for connected subsystems.
1429            #       Results in "error: Invalid argument"
1430            block_sysfs_settings = {
1431                "iostats": "0",
1432                "rq_affinity": "0",
1433                "nomerges": "2"
1434            }
1435
1436            for disk in self.get_connected_nvme_list():
1437                sysfs = os.path.join("/sys/block", disk, "queue")
1438                for k, v in block_sysfs_settings.items():
1439                    sysfs_opt_path = os.path.join(sysfs, k)
1440                    try:
1441                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1442                    except CalledProcessError as e:
1443                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1444                    finally:
1445                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1446                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1447
1448    def init_disconnect(self):
1449        for subsystem in self.subsystem_info_list:
1450            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1451            time.sleep(1)
1452
1453    def get_nvme_subsystem_numa(self, dev_name):
1454        # Remove two last characters to get controller name instead of subsystem name
1455        nvme_ctrl = os.path.basename(dev_name)[:-2]
1456        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1457                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1458        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1459
1460    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1461        self.available_cpus = self.get_numa_cpu_map()
1462        if len(threads) >= len(subsystems):
1463            threads = range(0, len(subsystems))
1464
1465        # Generate connected nvme devices names and sort them by used NIC numa node
1466        # to allow better grouping when splitting into fio sections.
1467        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1468        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1469        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1470
1471        filename_section = ""
1472        nvme_per_split = int(len(nvme_list) / len(threads))
1473        remainder = len(nvme_list) % len(threads)
1474        iterator = iter(nvme_list)
1475        result = []
1476        for i in range(len(threads)):
1477            result.append([])
1478            for _ in range(nvme_per_split):
1479                result[i].append(next(iterator))
1480                if remainder:
1481                    result[i].append(next(iterator))
1482                    remainder -= 1
1483        for i, r in enumerate(result):
1484            header = "[filename%s]" % i
1485            disks = "\n".join(["filename=%s" % x for x in r])
1486            job_section_qd = round((io_depth * len(r)) / num_jobs)
1487            if job_section_qd == 0:
1488                job_section_qd = 1
1489            iodepth = "iodepth=%s" % job_section_qd
1490
1491            offset_section = ""
1492            if offset:
1493                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1494
1495            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1496
1497            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1498
1499        return filename_section
1500
1501
1502class SPDKInitiator(Initiator):
1503    def __init__(self, name, general_config, initiator_config):
1504        super().__init__(name, general_config, initiator_config)
1505
1506        if self.skip_spdk_install is False:
1507            self.install_spdk()
1508
1509        # Optional fields
1510        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1511        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1512        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1513        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1514
1515        if "num_cores" in initiator_config:
1516            self.num_cores = initiator_config["num_cores"]
1517
1518        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1519        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1520
1521    def adq_set_busy_read(self, busy_read_val):
1522        return {"net.core.busy_read": busy_read_val}
1523
1524    def install_spdk(self):
1525        self.log.info("Using fio binary %s" % self.fio_bin)
1526        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1527        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1528        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1529                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1530                       "--enable-lto", "--disable-unit-tests"])
1531        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1532        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1533
1534        self.log.info("SPDK built")
1535        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1536
1537    def init_connect(self):
1538        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1539        # bdev plugin initiates connection just before starting IO traffic.
1540        # This is just to have a "init_connect" equivalent of the same function
1541        # from KernelInitiator class.
1542        # Just prepare bdev.conf JSON file for later use and consider it
1543        # "making a connection".
1544        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1545        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1546
1547    def init_disconnect(self):
1548        # SPDK Initiator does not need to explicity disconnect as this gets done
1549        # after fio bdev plugin finishes IO.
1550        return
1551
1552    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1553        spdk_cfg_section = {
1554            "subsystems": [
1555                {
1556                    "subsystem": "bdev",
1557                    "config": []
1558                },
1559                {
1560                    "subsystem": "iobuf",
1561                    "config": [
1562                        {
1563                            "method": "iobuf_set_options",
1564                            "params": {
1565                                "small_pool_count": self.small_pool_count,
1566                                "large_pool_count": self.large_pool_count
1567                            }
1568                        }
1569                    ]
1570                },
1571                {
1572                    "subsystem": "sock",
1573                    "config": [
1574                        {
1575                            "method": "sock_set_default_impl",
1576                            "params": {
1577                                "impl_name": self.sock_impl
1578                            }
1579                        }
1580                    ]
1581                }
1582            ]
1583        }
1584
1585        for i, subsys in enumerate(remote_subsystem_list):
1586            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1587            nvme_ctrl = {
1588                "method": "bdev_nvme_attach_controller",
1589                "params": {
1590                    "name": "Nvme{}".format(i),
1591                    "trtype": self.transport,
1592                    "traddr": sub_addr,
1593                    "trsvcid": sub_port,
1594                    "subnqn": sub_nqn,
1595                    "adrfam": "IPv4"
1596                }
1597            }
1598
1599            if self.enable_adq:
1600                nvme_ctrl["params"].update({"priority": "1"})
1601
1602            if self.enable_data_digest:
1603                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1604
1605            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1606
1607        return json.dumps(spdk_cfg_section, indent=2)
1608
1609    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1610        self.available_cpus = self.get_numa_cpu_map()
1611        filename_section = ""
1612        if len(threads) >= len(subsystems):
1613            threads = range(0, len(subsystems))
1614
1615        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1616        # to allow better grouping when splitting into fio sections.
1617        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1618        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1619        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1620
1621        nvme_per_split = int(len(subsystems) / len(threads))
1622        remainder = len(subsystems) % len(threads)
1623        iterator = iter(filenames)
1624        result = []
1625        for i in range(len(threads)):
1626            result.append([])
1627            for _ in range(nvme_per_split):
1628                result[i].append(next(iterator))
1629            if remainder:
1630                result[i].append(next(iterator))
1631                remainder -= 1
1632        for i, r in enumerate(result):
1633            header = "[filename%s]" % i
1634            disks = "\n".join(["filename=%s" % x for x in r])
1635            job_section_qd = round((io_depth * len(r)) / num_jobs)
1636            if job_section_qd == 0:
1637                job_section_qd = 1
1638            iodepth = "iodepth=%s" % job_section_qd
1639
1640            offset_section = ""
1641            if offset:
1642                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1643
1644            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1645
1646            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1647
1648        return filename_section
1649
1650    def get_nvme_subsystem_numa(self, bdev_name):
1651        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1652        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1653
1654        # Remove two last characters to get controller name instead of subsystem name
1655        nvme_ctrl = bdev_name[:-2]
1656        for bdev in bdev_conf_json_obj:
1657            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1658                return self.get_route_nic_numa(bdev["params"]["traddr"])
1659        return None
1660
1661
1662def initiators_match_subsystems(initiators, target_obj):
1663    for i in initiators:
1664        i.match_subsystems(target_obj.subsystem_info_list)
1665        if i.enable_adq:
1666            i.adq_configure_tc()
1667
1668
1669def run_single_fio_test(args,
1670                        initiators,
1671                        configs,
1672                        target_obj,
1673                        block_size,
1674                        io_depth,
1675                        rw,
1676                        fio_rw_mix_read,
1677                        fio_run_num,
1678                        fio_ramp_time,
1679                        fio_run_time):
1680
1681    for run_no in range(1, fio_run_num+1):
1682        threads = []
1683        power_daemon = None
1684        measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1685
1686        for i, cfg in zip(initiators, configs):
1687            t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1688            threads.append(t)
1689        if target_obj.enable_sar:
1690            sar_file_prefix = measurements_prefix + "_sar"
1691            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1692            threads.append(t)
1693
1694        if target_obj.enable_pcm:
1695            pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1696            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1697                                         args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1698            threads.append(pcm_cpu_t)
1699
1700        if target_obj.enable_bw:
1701            bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1702            t = threading.Thread(target=target_obj.measure_network_bandwidth,
1703                                 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1704            threads.append(t)
1705
1706        if target_obj.enable_dpdk_memory:
1707            dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1708            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1709            threads.append(t)
1710
1711        if target_obj.enable_pm:
1712            power_daemon = threading.Thread(target=target_obj.measure_power,
1713                                            args=(args.results, measurements_prefix, script_full_dir,
1714                                                  fio_ramp_time, fio_run_time))
1715            threads.append(power_daemon)
1716
1717        for t in threads:
1718            t.start()
1719        for t in threads:
1720            t.join()
1721
1722
1723def run_fio_tests(args, initiators, target_obj,
1724                  fio_workloads, fio_rw_mix_read,
1725                  fio_run_num, fio_ramp_time, fio_run_time,
1726                  fio_rate_iops, fio_offset, fio_offset_inc):
1727
1728    for block_size, io_depth, rw in fio_workloads:
1729        configs = []
1730        for i in initiators:
1731            i.init_connect()
1732            cfg = i.gen_fio_config(rw,
1733                                   fio_rw_mix_read,
1734                                   block_size,
1735                                   io_depth,
1736                                   target_obj.subsys_no,
1737                                   fio_num_jobs,
1738                                   fio_ramp_time,
1739                                   fio_run_time,
1740                                   fio_rate_iops,
1741                                   fio_offset,
1742                                   fio_offset_inc)
1743            configs.append(cfg)
1744
1745        run_single_fio_test(args,
1746                            initiators,
1747                            configs,
1748                            target_obj,
1749                            block_size,
1750                            io_depth,
1751                            rw,
1752                            fio_rw_mix_read,
1753                            fio_run_num,
1754                            fio_ramp_time,
1755                            fio_run_time)
1756
1757        for i in initiators:
1758            i.init_disconnect()
1759            i.copy_result_files(args.results)
1760
1761    try:
1762        parse_results(args.results, args.csv_filename)
1763    except Exception as err:
1764        logging.error("There was an error with parsing the results")
1765        raise err
1766
1767
1768if __name__ == "__main__":
1769    exit_code = 0
1770
1771    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1772    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1773
1774    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1775    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1776                        help='Configuration file.')
1777    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1778                        help='Results directory.')
1779    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1780                        help='CSV results filename.')
1781    parser.add_argument('-f', '--force', default=False, action='store_true',
1782                        dest='force', help="""Force script to continue and try to use all
1783                        available NVMe devices during test.
1784                        WARNING: Might result in data loss on used NVMe drives""")
1785
1786    args = parser.parse_args()
1787
1788    logging.basicConfig(level=logging.INFO,
1789                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1790
1791    logging.info("Using config file: %s" % args.config)
1792    with open(args.config, "r") as config:
1793        data = json.load(config)
1794
1795    initiators = []
1796    general_config = data["general"]
1797    target_config = data["target"]
1798    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1799
1800    if "null_block_devices" not in data["target"] and \
1801        (args.force is False and
1802            "allowlist" not in data["target"] and
1803            "blocklist" not in data["target"]):
1804        # TODO: Also check if allowlist or blocklist are not empty.
1805        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1806        You can choose to use all available NVMe drives on your system, which may potentially
1807        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1808        exit(1)
1809
1810    for k, v in data.items():
1811        if "target" in k:
1812            v.update({"results_dir": args.results})
1813            if data[k]["mode"] == "spdk":
1814                target_obj = SPDKTarget(k, data["general"], v)
1815            elif data[k]["mode"] == "kernel":
1816                target_obj = KernelTarget(k, data["general"], v)
1817        elif "initiator" in k:
1818            if data[k]["mode"] == "spdk":
1819                init_obj = SPDKInitiator(k, data["general"], v)
1820            elif data[k]["mode"] == "kernel":
1821                init_obj = KernelInitiator(k, data["general"], v)
1822            initiators.append(init_obj)
1823        elif "fio" in k:
1824            fio_workloads = itertools.product(data[k]["bs"],
1825                                              data[k]["qd"],
1826                                              data[k]["rw"])
1827
1828            fio_run_time = data[k]["run_time"]
1829            fio_ramp_time = data[k]["ramp_time"]
1830            fio_rw_mix_read = data[k]["rwmixread"]
1831            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1832            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1833
1834            fio_rate_iops = 0
1835            if "rate_iops" in data[k]:
1836                fio_rate_iops = data[k]["rate_iops"]
1837
1838            fio_offset = False
1839            if "offset" in data[k]:
1840                fio_offset = data[k]["offset"]
1841            fio_offset_inc = 0
1842            if "offset_inc" in data[k]:
1843                fio_offset_inc = data[k]["offset_inc"]
1844        else:
1845            continue
1846
1847    try:
1848        os.mkdir(args.results)
1849    except FileExistsError:
1850        pass
1851
1852    for i in initiators:
1853        target_obj.initiator_info.append(
1854            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1855        )
1856
1857    try:
1858        target_obj.tgt_start()
1859        initiators_match_subsystems(initiators, target_obj)
1860
1861        # Poor mans threading
1862        # Run FIO tests
1863        run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read,
1864                      fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops,
1865                      fio_offset, fio_offset_inc)
1866
1867    except Exception as e:
1868        logging.error("Exception occurred while running FIO tests")
1869        logging.error(e)
1870        exit_code = 1
1871
1872    finally:
1873        for i in initiators:
1874            try:
1875                i.stop()
1876            except Exception as err:
1877                pass
1878        target_obj.stop()
1879        sys.exit(exit_code)
1880