xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 12fbe739a31b09aff0d05f354d4f3bbef99afc55)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False),
57            ConfigField(name='pause_frames', default=False)
58        ]
59        self.read_config(config_fields, general_config)
60        self.transport = self.transport.lower()
61
62        config_fields = [
63            ConfigField(name='nic_ips', required=True),
64            ConfigField(name='mode', required=True),
65            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
66            ConfigField(name='enable_arfs', default=False),
67            ConfigField(name='tuned_profile', default='')
68        ]
69        self.read_config(config_fields, server_config)
70
71        self.local_nic_info = []
72        self._nics_json_obj = {}
73        self.svc_restore_dict = {}
74        self.sysctl_restore_dict = {}
75        self.tuned_restore_dict = {}
76        self.governor_restore = ""
77
78        self.enable_adq = server_config.get('adq_enable', False)
79        self.adq_priority = 1 if self.enable_adq else None
80        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
81
82        if not re.match(r'^[A-Za-z0-9\-]+$', name):
83            self.log.info("Please use a name which contains only letters, numbers or dashes")
84            sys.exit(1)
85
86    def read_config(self, config_fields, config):
87        for config_field in config_fields:
88            value = config.get(config_field.name, config_field.default)
89            if config_field.required is True and value is None:
90                raise ValueError('%s config key is required' % config_field.name)
91
92            setattr(self, config_field.name, value)
93
94    @staticmethod
95    def get_uncommented_lines(lines):
96        return [line for line in lines if line and not line.startswith('#')]
97
98    def get_nic_name_by_ip(self, ip):
99        if not self._nics_json_obj:
100            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
101            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
102        for nic in self._nics_json_obj:
103            for addr in nic["addr_info"]:
104                if ip in addr["local"]:
105                    return nic["ifname"]
106
107    @abstractmethod
108    def set_local_nic_info_helper(self):
109        pass
110
111    def set_local_nic_info(self, pci_info):
112        def extract_network_elements(json_obj):
113            nic_list = []
114            if isinstance(json_obj, list):
115                for x in json_obj:
116                    nic_list.extend(extract_network_elements(x))
117            elif isinstance(json_obj, dict):
118                if "children" in json_obj:
119                    nic_list.extend(extract_network_elements(json_obj["children"]))
120                if "class" in json_obj.keys() and "network" in json_obj["class"]:
121                    nic_list.append(json_obj)
122            return nic_list
123
124        self.local_nic_info = extract_network_elements(pci_info)
125
126    def get_nic_numa_node(self, nic_name):
127        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
128
129    def get_numa_cpu_map(self):
130        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
131        numa_cpu_json_map = {}
132
133        for cpu in numa_cpu_json_obj["cpus"]:
134            cpu_num = int(cpu["cpu"])
135            numa_node = int(cpu["node"])
136            numa_cpu_json_map.setdefault(numa_node, [])
137            numa_cpu_json_map[numa_node].append(cpu_num)
138
139        return numa_cpu_json_map
140
141    # pylint: disable=R0201
142    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
143        return ""
144
145    def configure_system(self):
146        self.load_drivers()
147        self.configure_services()
148        self.configure_sysctl()
149        self.configure_arfs()
150        self.configure_tuned()
151        self.configure_cpu_governor()
152        self.configure_irq_affinity(**self.irq_settings)
153        self.configure_pause_frames()
154
155    RDMA_PROTOCOL_IWARP = 0
156    RDMA_PROTOCOL_ROCE = 1
157    RDMA_PROTOCOL_UNKNOWN = -1
158
159    def check_rdma_protocol(self):
160        try:
161            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
162            roce_ena = roce_ena.strip()
163            if roce_ena == "0":
164                return self.RDMA_PROTOCOL_IWARP
165            else:
166                return self.RDMA_PROTOCOL_ROCE
167        except CalledProcessError as e:
168            self.log.error("ERROR: failed to check RDMA protocol!")
169            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
170            return self.RDMA_PROTOCOL_UNKNOWN
171
172    def load_drivers(self):
173        self.log.info("Loading drivers")
174        self.exec_cmd(["sudo", "modprobe", "-a",
175                       "nvme-%s" % self.transport,
176                       "nvmet-%s" % self.transport])
177        current_mode = self.check_rdma_protocol()
178        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
179            self.log.error("ERROR: failed to check RDMA protocol mode")
180            return
181        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
182            self.reload_driver("irdma", "roce_ena=1")
183            self.log.info("Loaded irdma driver with RoCE enabled")
184        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
185            self.log.info("Leaving irdma driver with RoCE enabled")
186        else:
187            self.reload_driver("irdma", "roce_ena=0")
188            self.log.info("Loaded irdma driver with iWARP enabled")
189
190    def set_pause_frames(self, rx_state, tx_state):
191        for nic_ip in self.nic_ips:
192            nic_name = self.get_nic_name_by_ip(nic_ip)
193            self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state])
194
195    def configure_pause_frames(self):
196        if not self.pause_frames:
197            self.log.info("Turning off pause frames")
198            self.set_pause_frames("off", "off")
199            return
200        self.log.info("Configuring pause frames")
201        self.set_pause_frames("on", "on")
202
203    def configure_arfs(self):
204        rps_flow_cnt = 512
205        if not self.enable_arfs:
206            rps_flow_cnt = 0
207
208        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
209        for nic_name in nic_names:
210            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
211            self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}")
212            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
213            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
214
215            for qf in queue_files:
216                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
217
218    def configure_adq(self):
219        self.adq_load_modules()
220        self.adq_configure_nic()
221
222    def adq_load_modules(self):
223        self.log.info("Modprobing ADQ-related Linux modules...")
224        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
225        for module in adq_module_deps:
226            try:
227                self.exec_cmd(["sudo", "modprobe", module])
228                self.log.info("%s loaded!" % module)
229            except CalledProcessError as e:
230                self.log.error("ERROR: failed to load module %s" % module)
231                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
232
233    def adq_configure_tc(self):
234        self.log.info("Configuring ADQ Traffic classes and filters...")
235
236        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
237        num_queues_tc1 = self.num_cores
238        port_param = "dst_port" if isinstance(self, Target) else "src_port"
239        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
240
241        for nic_ip in self.nic_ips:
242            nic_name = self.get_nic_name_by_ip(nic_ip)
243            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
244
245            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
246                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
247                                "queues", "%s@0" % num_queues_tc0,
248                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
249                                "hw", "1", "mode", "channel"]
250            self.log.info(" ".join(tc_qdisc_map_cmd))
251            self.exec_cmd(tc_qdisc_map_cmd)
252
253            time.sleep(5)
254            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
255            self.log.info(" ".join(tc_qdisc_ingress_cmd))
256            self.exec_cmd(tc_qdisc_ingress_cmd)
257
258            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
259            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
260                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
261
262            for port in nic_ports:
263                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
264                                 "protocol", "ip", "ingress", "prio", "1", "flower",
265                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
266                                 "skip_sw", "hw_tc", "1"]
267                self.log.info(" ".join(tc_filter_cmd))
268                self.exec_cmd(tc_filter_cmd)
269
270            # show tc configuration
271            self.log.info("Show tc configuration for %s NIC..." % nic_name)
272            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
273            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
274            self.log.info("%s" % tc_disk_out)
275            self.log.info("%s" % tc_filter_out)
276
277            # Ethtool coalesce settings must be applied after configuring traffic classes
278            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
279            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
280
281            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
282            xps_cmd = ["sudo", xps_script_path, nic_name]
283            self.log.info(xps_cmd)
284            self.exec_cmd(xps_cmd)
285
286    def reload_driver(self, driver, *modprobe_args):
287
288        try:
289            self.exec_cmd(["sudo", "rmmod", driver])
290            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
291        except CalledProcessError as e:
292            self.log.error("ERROR: failed to reload %s module!" % driver)
293            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
294
295    def adq_configure_nic(self):
296        self.log.info("Configuring NIC port settings for ADQ testing...")
297
298        # Reload the driver first, to make sure any previous settings are re-set.
299        self.reload_driver("ice")
300
301        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
302        for nic in nic_names:
303            self.log.info(nic)
304            try:
305                self.exec_cmd(["sudo", "ethtool", "-K", nic,
306                               "hw-tc-offload", "on"])  # Enable hardware TC offload
307                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
308                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
309                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
310                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
311                # but can be used with 4k block sizes as well
312                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
313                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
314            except subprocess.CalledProcessError as e:
315                self.log.error("ERROR: failed to configure NIC port using ethtool!")
316                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
317                self.log.info("Please update your NIC driver and firmware versions and try again.")
318            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
319            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
320
321    def configure_services(self):
322        self.log.info("Configuring active services...")
323        svc_config = configparser.ConfigParser(strict=False)
324
325        # Below list is valid only for RHEL / Fedora systems and might not
326        # contain valid names for other distributions.
327        svc_target_state = {
328            "firewalld": "inactive",
329            "irqbalance": "inactive",
330            "lldpad.service": "inactive",
331            "lldpad.socket": "inactive"
332        }
333
334        for service in svc_target_state:
335            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
336            out = "\n".join(["[%s]" % service, out])
337            svc_config.read_string(out)
338
339            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
340                continue
341
342            service_state = svc_config[service]["ActiveState"]
343            self.log.info("Current state of %s service is %s" % (service, service_state))
344            self.svc_restore_dict.update({service: service_state})
345            if service_state != "inactive":
346                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
347                self.exec_cmd(["sudo", "systemctl", "stop", service])
348
349    def configure_sysctl(self):
350        self.log.info("Tuning sysctl settings...")
351
352        busy_read = 0
353        busy_poll = 0
354        if self.enable_adq and self.mode == "spdk":
355            busy_read = 1
356            busy_poll = 1
357
358        sysctl_opts = {
359            "net.core.busy_poll": busy_poll,
360            "net.core.busy_read": busy_read,
361            "net.core.somaxconn": 4096,
362            "net.core.netdev_max_backlog": 8192,
363            "net.ipv4.tcp_max_syn_backlog": 16384,
364            "net.core.rmem_max": 268435456,
365            "net.core.wmem_max": 268435456,
366            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
367            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
368            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
369            "net.ipv4.route.flush": 1,
370            "vm.overcommit_memory": 1,
371            "net.core.rps_sock_flow_entries": 0
372        }
373
374        if self.enable_adq:
375            sysctl_opts.update(self.adq_set_busy_read(1))
376
377        if self.enable_arfs:
378            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
379
380        for opt, value in sysctl_opts.items():
381            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
382            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
383
384    def configure_tuned(self):
385        if not self.tuned_profile:
386            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
387            return
388
389        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
390        service = "tuned"
391        tuned_config = configparser.ConfigParser(strict=False)
392
393        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
394        out = "\n".join(["[%s]" % service, out])
395        tuned_config.read_string(out)
396        tuned_state = tuned_config[service]["ActiveState"]
397        self.svc_restore_dict.update({service: tuned_state})
398
399        if tuned_state != "inactive":
400            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
401            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
402
403            self.tuned_restore_dict = {
404                "profile": profile,
405                "mode": profile_mode
406            }
407
408        self.exec_cmd(["sudo", "systemctl", "start", service])
409        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
410        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
411
412    def configure_cpu_governor(self):
413        self.log.info("Setting CPU governor to performance...")
414
415        # This assumes that there is the same CPU scaling governor on each CPU
416        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
417        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
418
419    def get_core_list_from_mask(self, core_mask):
420        # Generate list of individual cores from hex core mask
421        # (e.g. '0xffff') or list containing:
422        # - individual cores (e.g. '1, 2, 3')
423        # - core ranges (e.g. '0-3')
424        # - mix of both (e.g. '0, 1-4, 9, 11-13')
425
426        core_list = []
427        if "0x" in core_mask:
428            core_mask_int = int(core_mask, 16)
429            for i in range(core_mask_int.bit_length()):
430                if (1 << i) & core_mask_int:
431                    core_list.append(i)
432            return core_list
433        else:
434            # Core list can be provided in .json config with square brackets
435            # remove them first
436            core_mask = core_mask.replace("[", "")
437            core_mask = core_mask.replace("]", "")
438
439            for i in core_mask.split(","):
440                if "-" in i:
441                    start, end = i.split("-")
442                    core_range = range(int(start), int(end) + 1)
443                    core_list.extend(core_range)
444                else:
445                    core_list.append(int(i))
446            return core_list
447
448    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
449        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
450
451        if mode not in ["default", "bynode", "cpulist"]:
452            raise ValueError("%s irq affinity setting not supported" % mode)
453
454        if mode == "cpulist" and not cpulist:
455            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
456
457        affinity_script = "set_irq_affinity.sh"
458        if "default" not in mode:
459            affinity_script = "set_irq_affinity_cpulist.sh"
460            system_cpu_map = self.get_numa_cpu_map()
461        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
462
463        def cpu_list_to_string(cpulist):
464            return ",".join(map(lambda x: str(x), cpulist))
465
466        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
467        for nic_name in nic_names:
468            irq_cmd = ["sudo", irq_script_path]
469
470            # Use only CPU cores matching NIC NUMA node.
471            # Remove any CPU cores if they're on exclusion list.
472            if mode == "bynode":
473                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
474                if cpulist and exclude_cpulist:
475                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
476                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
477                    if not irq_cpus:
478                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
479                irq_cmd.append(cpu_list_to_string(irq_cpus))
480
481            if mode == "cpulist":
482                irq_cpus = self.get_core_list_from_mask(cpulist)
483                if exclude_cpulist:
484                    # Flatten system CPU list, we don't need NUMA awareness here
485                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
486                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
487                    if not irq_cpus:
488                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
489                irq_cmd.append(cpu_list_to_string(irq_cpus))
490
491            irq_cmd.append(nic_name)
492            self.log.info(irq_cmd)
493            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
494
495    def restore_services(self):
496        self.log.info("Restoring services...")
497        for service, state in self.svc_restore_dict.items():
498            cmd = "stop" if state == "inactive" else "start"
499            self.exec_cmd(["sudo", "systemctl", cmd, service])
500
501    def restore_sysctl(self):
502        self.log.info("Restoring sysctl settings...")
503        for opt, value in self.sysctl_restore_dict.items():
504            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
505
506    def restore_tuned(self):
507        self.log.info("Restoring tuned-adm settings...")
508
509        if not self.tuned_restore_dict:
510            return
511
512        if self.tuned_restore_dict["mode"] == "auto":
513            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
514            self.log.info("Reverted tuned-adm to auto_profile.")
515        else:
516            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
517            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
518
519    def restore_governor(self):
520        self.log.info("Restoring CPU governor setting...")
521        if self.governor_restore:
522            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
523            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
524
525    def restore_settings(self):
526        self.restore_governor()
527        self.restore_tuned()
528        self.restore_services()
529        self.restore_sysctl()
530        if self.enable_adq:
531            self.reload_driver("ice")
532
533
534class Target(Server):
535    def __init__(self, name, general_config, target_config):
536        super().__init__(name, general_config, target_config)
537
538        # Defaults
539        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
540        self.subsystem_info_list = []
541        self.initiator_info = []
542
543        config_fields = [
544            ConfigField(name='mode', required=True),
545            ConfigField(name='results_dir', required=True),
546            ConfigField(name='enable_pm', default=True),
547            ConfigField(name='enable_sar', default=True),
548            ConfigField(name='enable_pcm', default=True),
549            ConfigField(name='enable_dpdk_memory', default=True)
550        ]
551        self.read_config(config_fields, target_config)
552
553        self.null_block = target_config.get('null_block_devices', 0)
554        self.scheduler_name = target_config.get('scheduler_settings', 'static')
555        self.enable_zcopy = target_config.get('zcopy_settings', False)
556        self.enable_bw = target_config.get('enable_bandwidth', True)
557        self.nvme_blocklist = target_config.get('blocklist', [])
558        self.nvme_allowlist = target_config.get('allowlist', [])
559
560        # Blocklist takes precedence, remove common elements from allowlist
561        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
562
563        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
564        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
565
566        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
567        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
568        self.set_local_nic_info(self.set_local_nic_info_helper())
569
570        if self.skip_spdk_install is False:
571            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
572
573        self.configure_system()
574        if self.enable_adq:
575            self.configure_adq()
576            self.configure_irq_affinity()
577        self.sys_config()
578
579    def set_local_nic_info_helper(self):
580        return json.loads(self.exec_cmd(["lshw", "-json"]))
581
582    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
583        stderr_opt = None
584        if stderr_redirect:
585            stderr_opt = subprocess.STDOUT
586        if change_dir:
587            old_cwd = os.getcwd()
588            os.chdir(change_dir)
589            self.log.info("Changing directory to %s" % change_dir)
590
591        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
592
593        if change_dir:
594            os.chdir(old_cwd)
595            self.log.info("Changing directory to %s" % old_cwd)
596        return out
597
598    def zip_spdk_sources(self, spdk_dir, dest_file):
599        self.log.info("Zipping SPDK source directory")
600        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
601        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
602            for file in files:
603                fh.write(os.path.relpath(os.path.join(root, file)))
604        fh.close()
605        self.log.info("Done zipping")
606
607    @staticmethod
608    def _chunks(input_list, chunks_no):
609        div, rem = divmod(len(input_list), chunks_no)
610        for i in range(chunks_no):
611            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
612            yield input_list[si:si + (div + 1 if i < rem else div)]
613
614    def spread_bdevs(self, req_disks):
615        # Spread available block devices indexes:
616        # - evenly across available initiator systems
617        # - evenly across available NIC interfaces for
618        #   each initiator
619        # Not NUMA aware.
620        ip_bdev_map = []
621        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
622
623        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
624            self.initiator_info[i]["bdev_range"] = init_chunk
625            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
626            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
627                for c in nic_chunk:
628                    ip_bdev_map.append((ip, c))
629        return ip_bdev_map
630
631    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
632        cpu_number = os.cpu_count()
633        sar_idle_sum = 0
634        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
635        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
636
637        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
638        time.sleep(ramp_time)
639        self.log.info("Starting SAR measurements")
640
641        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
642        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
643            for line in out.split("\n"):
644                if "Average" in line:
645                    if "CPU" in line:
646                        self.log.info("Summary CPU utilization from SAR:")
647                        self.log.info(line)
648                    elif "all" in line:
649                        self.log.info(line)
650                    else:
651                        sar_idle_sum += float(line.split()[7])
652            fh.write(out)
653        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
654
655        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
656            f.write("%0.2f" % sar_cpu_usage)
657
658    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
659        time.sleep(ramp_time)
660        self.log.info("Starting power measurements")
661        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
662                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
663                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
664
665    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
666        time.sleep(ramp_time)
667        cmd = ["pcm", "1", "-i=%s" % run_time,
668               "-csv=%s/%s" % (results_dir, pcm_file_name)]
669        subprocess.run(cmd)
670        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
671        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
672        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
673        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
674        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
675
676    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
677        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
678        time.sleep(ramp_time)
679        self.log.info("INFO: starting network bandwidth measure")
680        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
681                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
682        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
683        #       Improve this so that additional file containing measurements average is generated too.
684        # TODO: Monitor only these interfaces which are currently used to run the workload.
685
686    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
687        self.log.info("INFO: waiting to generate DPDK memory usage")
688        time.sleep(ramp_time)
689        self.log.info("INFO: generating DPDK memory usage")
690        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
691        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
692
693    def sys_config(self):
694        self.log.info("====Kernel release:====")
695        self.log.info(os.uname().release)
696        self.log.info("====Kernel command line:====")
697        with open('/proc/cmdline') as f:
698            cmdline = f.readlines()
699            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
700        self.log.info("====sysctl conf:====")
701        with open('/etc/sysctl.conf') as f:
702            sysctl = f.readlines()
703            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
704        self.log.info("====Cpu power info:====")
705        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
706        self.log.info("====zcopy settings:====")
707        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
708        self.log.info("====Scheduler settings:====")
709        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
710
711
712class Initiator(Server):
713    def __init__(self, name, general_config, initiator_config):
714        super().__init__(name, general_config, initiator_config)
715
716        # required fields and defaults
717        config_fields = [
718            ConfigField(name='mode', required=True),
719            ConfigField(name='ip', required=True),
720            ConfigField(name='target_nic_ips', required=True),
721            ConfigField(name='cpus_allowed', default=None),
722            ConfigField(name='cpus_allowed_policy', default='shared'),
723            ConfigField(name='spdk_dir', default='/tmp/spdk'),
724            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
725            ConfigField(name='nvmecli_bin', default='nvme'),
726            ConfigField(name='cpu_frequency', default=None),
727            ConfigField(name='allow_cpu_sharing', default=True)
728        ]
729
730        self.read_config(config_fields, initiator_config)
731
732        if os.getenv('SPDK_WORKSPACE'):
733            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
734
735        self.subsystem_info_list = []
736
737        self.ssh_connection = paramiko.SSHClient()
738        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
739        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
740        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
741        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
742        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
743
744        if self.skip_spdk_install is False:
745            self.copy_spdk("/tmp/spdk.zip")
746
747        self.set_local_nic_info(self.set_local_nic_info_helper())
748        self.set_cpu_frequency()
749        self.configure_system()
750        if self.enable_adq:
751            self.configure_adq()
752        self.sys_config()
753
754    def set_local_nic_info_helper(self):
755        return json.loads(self.exec_cmd(["lshw", "-json"]))
756
757    def stop(self):
758        self.restore_settings()
759        self.ssh_connection.close()
760
761    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
762        if change_dir:
763            cmd = ["cd", change_dir, ";", *cmd]
764
765        # In case one of the command elements contains whitespace and is not
766        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
767        # when sending to remote system.
768        for i, c in enumerate(cmd):
769            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
770                cmd[i] = '"%s"' % c
771        cmd = " ".join(cmd)
772
773        # Redirect stderr to stdout thanks using get_pty option if needed
774        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
775        out = stdout.read().decode(encoding="utf-8")
776
777        # Check the return code
778        rc = stdout.channel.recv_exit_status()
779        if rc:
780            raise CalledProcessError(int(rc), cmd, out)
781
782        return out
783
784    def put_file(self, local, remote_dest):
785        ftp = self.ssh_connection.open_sftp()
786        ftp.put(local, remote_dest)
787        ftp.close()
788
789    def get_file(self, remote, local_dest):
790        ftp = self.ssh_connection.open_sftp()
791        ftp.get(remote, local_dest)
792        ftp.close()
793
794    def copy_spdk(self, local_spdk_zip):
795        self.log.info("Copying SPDK sources to initiator %s" % self.name)
796        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
797        self.log.info("Copied sources zip from target")
798        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
799        self.log.info("Sources unpacked")
800
801    def copy_result_files(self, dest_dir):
802        self.log.info("Copying results")
803
804        if not os.path.exists(dest_dir):
805            os.mkdir(dest_dir)
806
807        # Get list of result files from initiator and copy them back to target
808        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
809
810        for file in file_list:
811            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
812                          os.path.join(dest_dir, file))
813        self.log.info("Done copying results")
814
815    def match_subsystems(self, target_subsystems):
816        subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips]
817        if not subsystems:
818            raise Exception("No matching subsystems found on target side!")
819        subsystems.sort(key=lambda x: x[1])
820        self.log.info("Found matching subsystems on target side:")
821        for s in subsystems:
822            self.log.info(s)
823        self.subsystem_info_list = subsystems
824
825    @abstractmethod
826    def init_connect(self):
827        pass
828
829    @abstractmethod
830    def init_disconnect(self):
831        pass
832
833    @abstractmethod
834    def gen_fio_filename_conf(self, *args, **kwargs):
835        # Logic implemented in SPDKInitiator and KernelInitiator classes
836        pass
837
838    def get_route_nic_numa(self, remote_nvme_ip):
839        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
840        local_nvme_nic = local_nvme_nic[0]["dev"]
841        return self.get_nic_numa_node(local_nvme_nic)
842
843    @staticmethod
844    def gen_fio_offset_section(offset_inc, num_jobs):
845        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
846        return "\n".join(["size=%s%%" % offset_inc,
847                          "offset=0%",
848                          "offset_increment=%s%%" % offset_inc])
849
850    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
851        numa_stats = {}
852        allowed_cpus = []
853        for nvme in fio_filenames_list:
854            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
855            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
856
857        # Use the most common NUMA node for this chunk to allocate memory and CPUs
858        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
859
860        # Check if we have enough free CPUs to pop from the list before assigning them
861        if len(self.available_cpus[section_local_numa]) < num_jobs:
862            if self.allow_cpu_sharing:
863                self.log.info("Regenerating available CPU list %s" % section_local_numa)
864                # Remove still available CPUs from the regenerated list. We don't want to
865                # regenerate it with duplicates.
866                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
867                self.available_cpus[section_local_numa].extend(cpus_regen)
868                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
869            else:
870                self.log.error("No more free CPU cores to use from allowed_cpus list!")
871                raise IndexError
872
873        for _ in range(num_jobs):
874            try:
875                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
876            except IndexError:
877                self.log.error("No more free CPU cores to use from allowed_cpus list!")
878                raise
879
880        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
881                          "numa_mem_policy=prefer:%s" % section_local_numa])
882
883    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
884                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
885                       offset=False, offset_inc=0):
886        fio_conf_template = """
887[global]
888ioengine={ioengine}
889{spdk_conf}
890thread=1
891group_reporting=1
892direct=1
893percentile_list=50:90:99:99.5:99.9:99.99:99.999
894
895norandommap=1
896rw={rw}
897rwmixread={rwmixread}
898bs={block_size}
899time_based=1
900ramp_time={ramp_time}
901runtime={run_time}
902rate_iops={rate_iops}
903"""
904
905        if self.cpus_allowed is not None:
906            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
907            cpus_num = 0
908            cpus = self.cpus_allowed.split(",")
909            for cpu in cpus:
910                if "-" in cpu:
911                    a, b = cpu.split("-")
912                    a = int(a)
913                    b = int(b)
914                    cpus_num += len(range(a, b))
915                else:
916                    cpus_num += 1
917            self.num_cores = cpus_num
918            threads = range(0, self.num_cores)
919        elif hasattr(self, 'num_cores'):
920            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
921            threads = range(0, int(self.num_cores))
922        else:
923            self.num_cores = len(self.subsystem_info_list)
924            threads = range(0, len(self.subsystem_info_list))
925
926        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
927                                                      offset, offset_inc)
928
929        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
930                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
931                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
932
933        # TODO: hipri disabled for now, as it causes fio errors:
934        # io_u error on file /dev/nvme2n1: Operation not supported
935        # See comment in KernelInitiator class, init_connect() function
936        if "io_uring" in self.ioengine:
937            fio_config = fio_config + """
938fixedbufs=1
939registerfiles=1
940#hipri=1
941"""
942        if num_jobs:
943            fio_config = fio_config + "numjobs=%s \n" % num_jobs
944        if self.cpus_allowed is not None:
945            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
946            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
947        fio_config = fio_config + filename_section
948
949        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
950        if hasattr(self, "num_cores"):
951            fio_config_filename += "_%sCPU" % self.num_cores
952        fio_config_filename += ".fio"
953
954        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
955        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
956        self.log.info("Created FIO Config:")
957        self.log.info(fio_config)
958
959        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
960
961    def set_cpu_frequency(self):
962        if self.cpu_frequency is not None:
963            try:
964                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
965                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
966                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
967            except Exception:
968                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
969                sys.exit(1)
970        else:
971            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
972
973    def run_fio(self, fio_config_file, run_num=1):
974        job_name, _ = os.path.splitext(fio_config_file)
975        self.log.info("Starting FIO run for job: %s" % job_name)
976        self.log.info("Using FIO: %s" % self.fio_bin)
977
978        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
979        try:
980            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
981                                    "--output=%s" % output_filename, "--eta=never"], True)
982            self.log.info(output)
983            self.log.info("FIO run finished. Results in: %s" % output_filename)
984        except subprocess.CalledProcessError as e:
985            self.log.error("ERROR: Fio process failed!")
986            self.log.error(e.stdout)
987
988    def sys_config(self):
989        self.log.info("====Kernel release:====")
990        self.log.info(self.exec_cmd(["uname", "-r"]))
991        self.log.info("====Kernel command line:====")
992        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
993        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
994        self.log.info("====sysctl conf:====")
995        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
996        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
997        self.log.info("====Cpu power info:====")
998        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
999
1000
1001class KernelTarget(Target):
1002    def __init__(self, name, general_config, target_config):
1003        super().__init__(name, general_config, target_config)
1004        # Defaults
1005        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1006
1007    def load_drivers(self):
1008        self.log.info("Loading drivers")
1009        super().load_drivers()
1010        if self.null_block:
1011            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1012
1013    def configure_adq(self):
1014        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1015
1016    def adq_configure_tc(self):
1017        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1018
1019    def adq_set_busy_read(self, busy_read_val):
1020        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1021        return {"net.core.busy_read": 0}
1022
1023    def stop(self):
1024        self.nvmet_command(self.nvmet_bin, "clear")
1025        self.restore_settings()
1026
1027    def get_nvme_device_bdf(self, nvme_dev_path):
1028        nvme_name = os.path.basename(nvme_dev_path)
1029        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1030
1031    def get_nvme_devices(self):
1032        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1033        nvme_list = []
1034        for dev in dev_list:
1035            if "nvme" not in dev:
1036                continue
1037            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1038                continue
1039            if len(self.nvme_allowlist) == 0:
1040                nvme_list.append(dev)
1041                continue
1042            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1043                nvme_list.append(dev)
1044        return nvme_list
1045
1046    def nvmet_command(self, nvmet_bin, command):
1047        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1048
1049    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1050
1051        nvmet_cfg = {
1052            "ports": [],
1053            "hosts": [],
1054            "subsystems": [],
1055        }
1056
1057        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1058            port = str(4420 + bdev_num)
1059            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1060            serial = "SPDK00%s" % bdev_num
1061            bdev_name = nvme_list[bdev_num]
1062
1063            nvmet_cfg["subsystems"].append({
1064                "allowed_hosts": [],
1065                "attr": {
1066                    "allow_any_host": "1",
1067                    "serial": serial,
1068                    "version": "1.3"
1069                },
1070                "namespaces": [
1071                    {
1072                        "device": {
1073                            "path": bdev_name,
1074                            "uuid": "%s" % uuid.uuid4()
1075                        },
1076                        "enable": 1,
1077                        "nsid": port
1078                    }
1079                ],
1080                "nqn": nqn
1081            })
1082
1083            nvmet_cfg["ports"].append({
1084                "addr": {
1085                    "adrfam": "ipv4",
1086                    "traddr": ip,
1087                    "trsvcid": port,
1088                    "trtype": self.transport
1089                },
1090                "portid": bdev_num,
1091                "referrals": [],
1092                "subsystems": [nqn]
1093            })
1094
1095            self.subsystem_info_list.append((port, nqn, ip))
1096        self.subsys_no = len(self.subsystem_info_list)
1097
1098        with open("kernel.conf", "w") as fh:
1099            fh.write(json.dumps(nvmet_cfg, indent=2))
1100
1101    def tgt_start(self):
1102        self.log.info("Configuring kernel NVMeOF Target")
1103
1104        if self.null_block:
1105            self.log.info("Configuring with null block device.")
1106            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1107        else:
1108            self.log.info("Configuring with NVMe drives.")
1109            nvme_list = self.get_nvme_devices()
1110
1111        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1112        self.subsys_no = len(nvme_list)
1113
1114        self.nvmet_command(self.nvmet_bin, "clear")
1115        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1116
1117        if self.enable_adq:
1118            self.adq_configure_tc()
1119
1120        self.log.info("Done configuring kernel NVMeOF Target")
1121
1122
1123class SPDKTarget(Target):
1124    def __init__(self, name, general_config, target_config):
1125        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1126        # Method setting IRQ affinity is run as part of parent classes init,
1127        # so we need to have self.core_mask set before changing IRQ affinity.
1128        self.core_mask = target_config["core_mask"]
1129        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1130
1131        super().__init__(name, general_config, target_config)
1132
1133        # Format: property, default value
1134        config_fields = [
1135            ConfigField(name='dif_insert_strip', default=False),
1136            ConfigField(name='null_block_dif_type', default=0),
1137            ConfigField(name='num_shared_buffers', default=4096),
1138            ConfigField(name='max_queue_depth', default=128),
1139            ConfigField(name='bpf_scripts', default=[]),
1140            ConfigField(name='scheduler_core_limit', default=None),
1141            ConfigField(name='dsa_settings', default=False),
1142            ConfigField(name='iobuf_small_pool_count', default=32767),
1143            ConfigField(name='iobuf_large_pool_count', default=16383),
1144            ConfigField(name='num_cqe', default=4096),
1145            ConfigField(name='sock_impl', default='posix')
1146        ]
1147
1148        self.read_config(config_fields, target_config)
1149
1150        self.bpf_proc = None
1151        self.enable_dsa = False
1152
1153        self.log.info("====DSA settings:====")
1154        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1155
1156    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1157        if mode not in ["default", "bynode", "cpulist",
1158                        "shared", "split", "split-bynode"]:
1159            self.log.error("%s irq affinity setting not supported" % mode)
1160            raise Exception
1161
1162        # Create core list from SPDK's mask and change it to string.
1163        # This is the type configure_irq_affinity expects for cpulist parameter.
1164        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1165        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1166        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1167
1168        if mode == "shared":
1169            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1170        elif mode == "split":
1171            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1172        elif mode == "split-bynode":
1173            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1174        else:
1175            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1176
1177    def adq_set_busy_read(self, busy_read_val):
1178        return {"net.core.busy_read": busy_read_val}
1179
1180    def get_nvme_devices_count(self):
1181        return len(self.get_nvme_devices())
1182
1183    def get_nvme_devices(self):
1184        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1185        bdev_bdfs = []
1186        for bdev in bdev_subsys_json_obj["config"]:
1187            bdev_traddr = bdev["params"]["traddr"]
1188            if bdev_traddr in self.nvme_blocklist:
1189                continue
1190            if len(self.nvme_allowlist) == 0:
1191                bdev_bdfs.append(bdev_traddr)
1192            if bdev_traddr in self.nvme_allowlist:
1193                bdev_bdfs.append(bdev_traddr)
1194        return bdev_bdfs
1195
1196    def spdk_tgt_configure(self):
1197        self.log.info("Configuring SPDK NVMeOF target via RPC")
1198
1199        # Create transport layer
1200        nvmf_transport_params = {
1201            "client": self.client,
1202            "trtype": self.transport,
1203            "num_shared_buffers": self.num_shared_buffers,
1204            "max_queue_depth": self.max_queue_depth,
1205            "dif_insert_or_strip": self.dif_insert_strip,
1206            "sock_priority": self.adq_priority,
1207            "num_cqe": self.num_cqe
1208        }
1209
1210        if self.enable_adq:
1211            nvmf_transport_params["acceptor_poll_rate"] = 10000
1212
1213        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1214        self.log.info("SPDK NVMeOF transport layer:")
1215        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1216
1217        if self.null_block:
1218            self.spdk_tgt_add_nullblock(self.null_block)
1219            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1220        else:
1221            self.spdk_tgt_add_nvme_conf()
1222            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1223
1224        if self.enable_adq:
1225            self.adq_configure_tc()
1226
1227        self.log.info("Done configuring SPDK NVMeOF Target")
1228
1229    def spdk_tgt_add_nullblock(self, null_block_count):
1230        md_size = 0
1231        block_size = 4096
1232        if self.null_block_dif_type != 0:
1233            md_size = 128
1234
1235        self.log.info("Adding null block bdevices to config via RPC")
1236        for i in range(null_block_count):
1237            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1238            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1239                                      dif_type=self.null_block_dif_type, md_size=md_size)
1240        self.log.info("SPDK Bdevs configuration:")
1241        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1242
1243    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1244        self.log.info("Adding NVMe bdevs to config via RPC")
1245
1246        bdfs = self.get_nvme_devices()
1247        bdfs = [b.replace(":", ".") for b in bdfs]
1248
1249        if req_num_disks:
1250            if req_num_disks > len(bdfs):
1251                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1252                sys.exit(1)
1253            else:
1254                bdfs = bdfs[0:req_num_disks]
1255
1256        for i, bdf in enumerate(bdfs):
1257            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1258
1259        self.log.info("SPDK Bdevs configuration:")
1260        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1261
1262    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1263        self.log.info("Adding subsystems to config")
1264        if not req_num_disks:
1265            req_num_disks = self.get_nvme_devices_count()
1266
1267        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1268            port = str(4420 + bdev_num)
1269            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1270            serial = "SPDK00%s" % bdev_num
1271            bdev_name = "Nvme%sn1" % bdev_num
1272
1273            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1274                                           allow_any_host=True, max_namespaces=8)
1275            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1276            for nqn_name in [nqn, "discovery"]:
1277                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1278                                                     nqn=nqn_name,
1279                                                     trtype=self.transport,
1280                                                     traddr=ip,
1281                                                     trsvcid=port,
1282                                                     adrfam="ipv4")
1283            self.subsystem_info_list.append((port, nqn, ip))
1284        self.subsys_no = len(self.subsystem_info_list)
1285
1286        self.log.info("SPDK NVMeOF subsystem configuration:")
1287        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1288
1289    def bpf_start(self):
1290        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1291        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1292        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1293        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1294
1295        with open(self.pid, "r") as fh:
1296            nvmf_pid = str(fh.readline())
1297
1298        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1299        self.log.info(cmd)
1300        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1301
1302    def tgt_start(self):
1303        if self.null_block:
1304            self.subsys_no = 1
1305        else:
1306            self.subsys_no = self.get_nvme_devices_count()
1307        self.log.info("Starting SPDK NVMeOF Target process")
1308        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1309        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1310        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1311
1312        with open(self.pid, "w") as fh:
1313            fh.write(str(proc.pid))
1314        self.nvmf_proc = proc
1315        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1316        self.log.info("Waiting for spdk to initialize...")
1317        while True:
1318            if os.path.exists("/var/tmp/spdk.sock"):
1319                break
1320            time.sleep(1)
1321        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1322
1323        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1324        rpc.iobuf.iobuf_set_options(self.client,
1325                                    small_pool_count=self.iobuf_small_pool_count,
1326                                    large_pool_count=self.iobuf_large_pool_count,
1327                                    small_bufsize=None,
1328                                    large_bufsize=None)
1329
1330        if self.enable_zcopy:
1331            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1332                                           enable_zerocopy_send_server=True)
1333            self.log.info("Target socket options:")
1334            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1335
1336        if self.enable_adq:
1337            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1338            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1339                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1340
1341        if self.enable_dsa:
1342            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1343            self.log.info("Target DSA accel module enabled")
1344
1345        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1346        rpc.framework_start_init(self.client)
1347
1348        if self.bpf_scripts:
1349            self.bpf_start()
1350
1351        self.spdk_tgt_configure()
1352
1353    def stop(self):
1354        if self.bpf_proc:
1355            self.log.info("Stopping BPF Trace script")
1356            self.bpf_proc.terminate()
1357            self.bpf_proc.wait()
1358
1359        if hasattr(self, "nvmf_proc"):
1360            try:
1361                self.nvmf_proc.terminate()
1362                self.nvmf_proc.wait(timeout=30)
1363            except Exception as e:
1364                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1365                self.log.info(e)
1366                self.nvmf_proc.kill()
1367                self.nvmf_proc.communicate()
1368                # Try to clean up RPC socket files if they were not removed
1369                # because of using 'kill'
1370                try:
1371                    os.remove("/var/tmp/spdk.sock")
1372                    os.remove("/var/tmp/spdk.sock.lock")
1373                except FileNotFoundError:
1374                    pass
1375        self.restore_settings()
1376
1377
1378class KernelInitiator(Initiator):
1379    def __init__(self, name, general_config, initiator_config):
1380        super().__init__(name, general_config, initiator_config)
1381
1382        # Defaults
1383        self.extra_params = initiator_config.get('extra_params', '')
1384
1385        self.ioengine = "libaio"
1386        self.spdk_conf = ""
1387
1388        if "num_cores" in initiator_config:
1389            self.num_cores = initiator_config["num_cores"]
1390
1391        if "kernel_engine" in initiator_config:
1392            self.ioengine = initiator_config["kernel_engine"]
1393            if "io_uring" in self.ioengine:
1394                self.extra_params += ' --nr-poll-queues=8'
1395
1396    def configure_adq(self):
1397        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1398
1399    def adq_configure_tc(self):
1400        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1401
1402    def adq_set_busy_read(self, busy_read_val):
1403        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1404        return {"net.core.busy_read": 0}
1405
1406    def get_connected_nvme_list(self):
1407        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1408        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1409                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1410        return nvme_list
1411
1412    def init_connect(self):
1413        self.log.info("Below connection attempts may result in error messages, this is expected!")
1414        for subsystem in self.subsystem_info_list:
1415            self.log.info("Trying to connect %s %s %s" % subsystem)
1416            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1417                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1418            time.sleep(2)
1419
1420        if "io_uring" in self.ioengine:
1421            self.log.info("Setting block layer settings for io_uring.")
1422
1423            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1424            #       apparently it's not possible for connected subsystems.
1425            #       Results in "error: Invalid argument"
1426            block_sysfs_settings = {
1427                "iostats": "0",
1428                "rq_affinity": "0",
1429                "nomerges": "2"
1430            }
1431
1432            for disk in self.get_connected_nvme_list():
1433                sysfs = os.path.join("/sys/block", disk, "queue")
1434                for k, v in block_sysfs_settings.items():
1435                    sysfs_opt_path = os.path.join(sysfs, k)
1436                    try:
1437                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1438                    except CalledProcessError as e:
1439                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1440                    finally:
1441                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1442                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1443
1444    def init_disconnect(self):
1445        for subsystem in self.subsystem_info_list:
1446            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1447            time.sleep(1)
1448
1449    def get_nvme_subsystem_numa(self, dev_name):
1450        # Remove two last characters to get controller name instead of subsystem name
1451        nvme_ctrl = os.path.basename(dev_name)[:-2]
1452        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1453                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1454        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1455
1456    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1457        self.available_cpus = self.get_numa_cpu_map()
1458        if len(threads) >= len(subsystems):
1459            threads = range(0, len(subsystems))
1460
1461        # Generate connected nvme devices names and sort them by used NIC numa node
1462        # to allow better grouping when splitting into fio sections.
1463        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1464        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1465        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1466
1467        filename_section = ""
1468        nvme_per_split = int(len(nvme_list) / len(threads))
1469        remainder = len(nvme_list) % len(threads)
1470        iterator = iter(nvme_list)
1471        result = []
1472        for i in range(len(threads)):
1473            result.append([])
1474            for _ in range(nvme_per_split):
1475                result[i].append(next(iterator))
1476                if remainder:
1477                    result[i].append(next(iterator))
1478                    remainder -= 1
1479        for i, r in enumerate(result):
1480            header = "[filename%s]" % i
1481            disks = "\n".join(["filename=%s" % x for x in r])
1482            job_section_qd = round((io_depth * len(r)) / num_jobs)
1483            if job_section_qd == 0:
1484                job_section_qd = 1
1485            iodepth = "iodepth=%s" % job_section_qd
1486
1487            offset_section = ""
1488            if offset:
1489                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1490
1491            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1492
1493            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1494
1495        return filename_section
1496
1497
1498class SPDKInitiator(Initiator):
1499    def __init__(self, name, general_config, initiator_config):
1500        super().__init__(name, general_config, initiator_config)
1501
1502        if self.skip_spdk_install is False:
1503            self.install_spdk()
1504
1505        # Optional fields
1506        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1507        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1508        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1509        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1510
1511        if "num_cores" in initiator_config:
1512            self.num_cores = initiator_config["num_cores"]
1513
1514        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1515        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1516
1517    def adq_set_busy_read(self, busy_read_val):
1518        return {"net.core.busy_read": busy_read_val}
1519
1520    def install_spdk(self):
1521        self.log.info("Using fio binary %s" % self.fio_bin)
1522        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1523        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1524        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1525                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1526                       "--enable-lto", "--disable-unit-tests"])
1527        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1528        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1529
1530        self.log.info("SPDK built")
1531        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1532
1533    def init_connect(self):
1534        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1535        # bdev plugin initiates connection just before starting IO traffic.
1536        # This is just to have a "init_connect" equivalent of the same function
1537        # from KernelInitiator class.
1538        # Just prepare bdev.conf JSON file for later use and consider it
1539        # "making a connection".
1540        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1541        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1542
1543    def init_disconnect(self):
1544        # SPDK Initiator does not need to explicity disconnect as this gets done
1545        # after fio bdev plugin finishes IO.
1546        return
1547
1548    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1549        spdk_cfg_section = {
1550            "subsystems": [
1551                {
1552                    "subsystem": "bdev",
1553                    "config": []
1554                },
1555                {
1556                    "subsystem": "iobuf",
1557                    "config": [
1558                        {
1559                            "method": "iobuf_set_options",
1560                            "params": {
1561                                "small_pool_count": self.small_pool_count,
1562                                "large_pool_count": self.large_pool_count
1563                            }
1564                        }
1565                    ]
1566                },
1567                {
1568                    "subsystem": "sock",
1569                    "config": [
1570                        {
1571                            "method": "sock_set_default_impl",
1572                            "params": {
1573                                "impl_name": self.sock_impl
1574                            }
1575                        }
1576                    ]
1577                }
1578            ]
1579        }
1580
1581        for i, subsys in enumerate(remote_subsystem_list):
1582            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1583            nvme_ctrl = {
1584                "method": "bdev_nvme_attach_controller",
1585                "params": {
1586                    "name": "Nvme{}".format(i),
1587                    "trtype": self.transport,
1588                    "traddr": sub_addr,
1589                    "trsvcid": sub_port,
1590                    "subnqn": sub_nqn,
1591                    "adrfam": "IPv4"
1592                }
1593            }
1594
1595            if self.enable_adq:
1596                nvme_ctrl["params"].update({"priority": "1"})
1597
1598            if self.enable_data_digest:
1599                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1600
1601            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1602
1603        return json.dumps(spdk_cfg_section, indent=2)
1604
1605    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1606        self.available_cpus = self.get_numa_cpu_map()
1607        filename_section = ""
1608        if len(threads) >= len(subsystems):
1609            threads = range(0, len(subsystems))
1610
1611        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1612        # to allow better grouping when splitting into fio sections.
1613        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1614        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1615        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1616
1617        nvme_per_split = int(len(subsystems) / len(threads))
1618        remainder = len(subsystems) % len(threads)
1619        iterator = iter(filenames)
1620        result = []
1621        for i in range(len(threads)):
1622            result.append([])
1623            for _ in range(nvme_per_split):
1624                result[i].append(next(iterator))
1625            if remainder:
1626                result[i].append(next(iterator))
1627                remainder -= 1
1628        for i, r in enumerate(result):
1629            header = "[filename%s]" % i
1630            disks = "\n".join(["filename=%s" % x for x in r])
1631            job_section_qd = round((io_depth * len(r)) / num_jobs)
1632            if job_section_qd == 0:
1633                job_section_qd = 1
1634            iodepth = "iodepth=%s" % job_section_qd
1635
1636            offset_section = ""
1637            if offset:
1638                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1639
1640            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1641
1642            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1643
1644        return filename_section
1645
1646    def get_nvme_subsystem_numa(self, bdev_name):
1647        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1648        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1649
1650        # Remove two last characters to get controller name instead of subsystem name
1651        nvme_ctrl = bdev_name[:-2]
1652        for bdev in bdev_conf_json_obj:
1653            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1654                return self.get_route_nic_numa(bdev["params"]["traddr"])
1655        return None
1656
1657
1658def initiators_match_subsystems(initiators, target_obj):
1659    for i in initiators:
1660        i.match_subsystems(target_obj.subsystem_info_list)
1661        if i.enable_adq:
1662            i.adq_configure_tc()
1663
1664
1665def run_single_fio_test(args,
1666                        initiators,
1667                        configs,
1668                        target_obj,
1669                        block_size,
1670                        io_depth,
1671                        rw,
1672                        fio_rw_mix_read,
1673                        fio_run_num,
1674                        fio_ramp_time,
1675                        fio_run_time):
1676
1677    for run_no in range(1, fio_run_num+1):
1678        threads = []
1679        power_daemon = None
1680        measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1681
1682        for i, cfg in zip(initiators, configs):
1683            t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1684            threads.append(t)
1685        if target_obj.enable_sar:
1686            sar_file_prefix = measurements_prefix + "_sar"
1687            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1688            threads.append(t)
1689
1690        if target_obj.enable_pcm:
1691            pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1692            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1693                                         args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1694            threads.append(pcm_cpu_t)
1695
1696        if target_obj.enable_bw:
1697            bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1698            t = threading.Thread(target=target_obj.measure_network_bandwidth,
1699                                 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1700            threads.append(t)
1701
1702        if target_obj.enable_dpdk_memory:
1703            dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1704            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1705            threads.append(t)
1706
1707        if target_obj.enable_pm:
1708            power_daemon = threading.Thread(target=target_obj.measure_power,
1709                                            args=(args.results, measurements_prefix, script_full_dir,
1710                                                  fio_ramp_time, fio_run_time))
1711            threads.append(power_daemon)
1712
1713        for t in threads:
1714            t.start()
1715        for t in threads:
1716            t.join()
1717
1718
1719def run_fio_tests(args, initiators, target_obj,
1720                  fio_workloads, fio_rw_mix_read,
1721                  fio_run_num, fio_ramp_time, fio_run_time,
1722                  fio_rate_iops, fio_offset, fio_offset_inc):
1723
1724    for block_size, io_depth, rw in fio_workloads:
1725        configs = []
1726        for i in initiators:
1727            i.init_connect()
1728            cfg = i.gen_fio_config(rw,
1729                                   fio_rw_mix_read,
1730                                   block_size,
1731                                   io_depth,
1732                                   target_obj.subsys_no,
1733                                   fio_num_jobs,
1734                                   fio_ramp_time,
1735                                   fio_run_time,
1736                                   fio_rate_iops,
1737                                   fio_offset,
1738                                   fio_offset_inc)
1739            configs.append(cfg)
1740
1741        run_single_fio_test(args,
1742                            initiators,
1743                            configs,
1744                            target_obj,
1745                            block_size,
1746                            io_depth,
1747                            rw,
1748                            fio_rw_mix_read,
1749                            fio_run_num,
1750                            fio_ramp_time,
1751                            fio_run_time)
1752
1753        for i in initiators:
1754            i.init_disconnect()
1755            i.copy_result_files(args.results)
1756
1757    try:
1758        parse_results(args.results, args.csv_filename)
1759    except Exception as err:
1760        logging.error("There was an error with parsing the results")
1761        raise err
1762
1763
1764if __name__ == "__main__":
1765    exit_code = 0
1766
1767    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1768    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1769
1770    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1771    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1772                        help='Configuration file.')
1773    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1774                        help='Results directory.')
1775    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1776                        help='CSV results filename.')
1777    parser.add_argument('-f', '--force', default=False, action='store_true',
1778                        dest='force', help="""Force script to continue and try to use all
1779                        available NVMe devices during test.
1780                        WARNING: Might result in data loss on used NVMe drives""")
1781
1782    args = parser.parse_args()
1783
1784    logging.basicConfig(level=logging.INFO,
1785                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1786
1787    logging.info("Using config file: %s" % args.config)
1788    with open(args.config, "r") as config:
1789        data = json.load(config)
1790
1791    initiators = []
1792    general_config = data["general"]
1793    target_config = data["target"]
1794    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1795
1796    if "null_block_devices" not in data["target"] and \
1797        (args.force is False and
1798            "allowlist" not in data["target"] and
1799            "blocklist" not in data["target"]):
1800        # TODO: Also check if allowlist or blocklist are not empty.
1801        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1802        You can choose to use all available NVMe drives on your system, which may potentially
1803        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1804        exit(1)
1805
1806    for k, v in data.items():
1807        if "target" in k:
1808            v.update({"results_dir": args.results})
1809            if data[k]["mode"] == "spdk":
1810                target_obj = SPDKTarget(k, data["general"], v)
1811            elif data[k]["mode"] == "kernel":
1812                target_obj = KernelTarget(k, data["general"], v)
1813        elif "initiator" in k:
1814            if data[k]["mode"] == "spdk":
1815                init_obj = SPDKInitiator(k, data["general"], v)
1816            elif data[k]["mode"] == "kernel":
1817                init_obj = KernelInitiator(k, data["general"], v)
1818            initiators.append(init_obj)
1819        elif "fio" in k:
1820            fio_workloads = itertools.product(data[k]["bs"],
1821                                              data[k]["qd"],
1822                                              data[k]["rw"])
1823
1824            fio_run_time = data[k]["run_time"]
1825            fio_ramp_time = data[k]["ramp_time"]
1826            fio_rw_mix_read = data[k]["rwmixread"]
1827            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1828            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1829
1830            fio_rate_iops = 0
1831            if "rate_iops" in data[k]:
1832                fio_rate_iops = data[k]["rate_iops"]
1833
1834            fio_offset = False
1835            if "offset" in data[k]:
1836                fio_offset = data[k]["offset"]
1837            fio_offset_inc = 0
1838            if "offset_inc" in data[k]:
1839                fio_offset_inc = data[k]["offset_inc"]
1840        else:
1841            continue
1842
1843    try:
1844        os.mkdir(args.results)
1845    except FileExistsError:
1846        pass
1847
1848    for i in initiators:
1849        target_obj.initiator_info.append(
1850            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1851        )
1852
1853    try:
1854        target_obj.tgt_start()
1855        initiators_match_subsystems(initiators, target_obj)
1856
1857        # Poor mans threading
1858        # Run FIO tests
1859        run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read,
1860                      fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops,
1861                      fio_offset, fio_offset_inc)
1862
1863    except Exception as e:
1864        logging.error("Exception occurred while running FIO tests")
1865        logging.error(e)
1866        exit_code = 1
1867
1868    finally:
1869        for i in initiators:
1870            try:
1871                i.stop()
1872            except Exception as err:
1873                pass
1874        target_obj.stop()
1875        sys.exit(exit_code)
1876