xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 60982c759db49b4f4579f16e3b24df0725ba4b94)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False)
57        ]
58        self.read_config(config_fields, general_config)
59        self.transport = self.transport.lower()
60
61        config_fields = [
62            ConfigField(name='nic_ips', required=True),
63            ConfigField(name='mode', required=True),
64            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
65            ConfigField(name='enable_arfs', default=False),
66            ConfigField(name='tuned_profile', default='')
67        ]
68        self.read_config(config_fields, server_config)
69
70        self.local_nic_info = []
71        self._nics_json_obj = {}
72        self.svc_restore_dict = {}
73        self.sysctl_restore_dict = {}
74        self.tuned_restore_dict = {}
75        self.governor_restore = ""
76
77        self.enable_adq = server_config.get('adq_enable', False)
78        self.adq_priority = 1 if self.enable_adq else None
79        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
80
81        if not re.match(r'^[A-Za-z0-9\-]+$', name):
82            self.log.info("Please use a name which contains only letters, numbers or dashes")
83            sys.exit(1)
84
85    def read_config(self, config_fields, config):
86        for config_field in config_fields:
87            value = config.get(config_field.name, config_field.default)
88            if config_field.required is True and value is None:
89                raise ValueError('%s config key is required' % config_field.name)
90
91            setattr(self, config_field.name, value)
92
93    @staticmethod
94    def get_uncommented_lines(lines):
95        return [line for line in lines if line and not line.startswith('#')]
96
97    def get_nic_name_by_ip(self, ip):
98        if not self._nics_json_obj:
99            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
100            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
101        for nic in self._nics_json_obj:
102            for addr in nic["addr_info"]:
103                if ip in addr["local"]:
104                    return nic["ifname"]
105
106    @abstractmethod
107    def set_local_nic_info_helper(self):
108        pass
109
110    def set_local_nic_info(self, pci_info):
111        def extract_network_elements(json_obj):
112            nic_list = []
113            if isinstance(json_obj, list):
114                for x in json_obj:
115                    nic_list.extend(extract_network_elements(x))
116            elif isinstance(json_obj, dict):
117                if "children" in json_obj:
118                    nic_list.extend(extract_network_elements(json_obj["children"]))
119                if "class" in json_obj.keys() and "network" in json_obj["class"]:
120                    nic_list.append(json_obj)
121            return nic_list
122
123        self.local_nic_info = extract_network_elements(pci_info)
124
125    def get_nic_numa_node(self, nic_name):
126        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
127
128    def get_numa_cpu_map(self):
129        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
130        numa_cpu_json_map = {}
131
132        for cpu in numa_cpu_json_obj["cpus"]:
133            cpu_num = int(cpu["cpu"])
134            numa_node = int(cpu["node"])
135            numa_cpu_json_map.setdefault(numa_node, [])
136            numa_cpu_json_map[numa_node].append(cpu_num)
137
138        return numa_cpu_json_map
139
140    # pylint: disable=R0201
141    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
142        return ""
143
144    def configure_system(self):
145        self.load_drivers()
146        self.configure_services()
147        self.configure_sysctl()
148        self.configure_arfs()
149        self.configure_tuned()
150        self.configure_cpu_governor()
151        self.configure_irq_affinity(**self.irq_settings)
152
153    RDMA_PROTOCOL_IWARP = 0
154    RDMA_PROTOCOL_ROCE = 1
155    RDMA_PROTOCOL_UNKNOWN = -1
156
157    def check_rdma_protocol(self):
158        try:
159            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
160            roce_ena = roce_ena.strip()
161            if roce_ena == "0":
162                return self.RDMA_PROTOCOL_IWARP
163            else:
164                return self.RDMA_PROTOCOL_ROCE
165        except CalledProcessError as e:
166            self.log.error("ERROR: failed to check RDMA protocol!")
167            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
168            return self.RDMA_PROTOCOL_UNKNOWN
169
170    def load_drivers(self):
171        self.log.info("Loading drivers")
172        self.exec_cmd(["sudo", "modprobe", "-a",
173                       "nvme-%s" % self.transport,
174                       "nvmet-%s" % self.transport])
175        current_mode = self.check_rdma_protocol()
176        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
177            self.log.error("ERROR: failed to check RDMA protocol mode")
178            return
179        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
180            self.reload_driver("irdma", "roce_ena=1")
181            self.log.info("Loaded irdma driver with RoCE enabled")
182        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
183            self.log.info("Leaving irdma driver with RoCE enabled")
184        else:
185            self.reload_driver("irdma", "roce_ena=0")
186            self.log.info("Loaded irdma driver with iWARP enabled")
187
188    def configure_arfs(self):
189        rps_flow_cnt = 512
190        if not self.enable_arfs:
191            rps_flow_cnt = 0
192
193        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
194        for nic_name in nic_names:
195            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
196            self.log.info(f"Setting rps_flow_cnt for {nic_name}")
197            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
198            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
199
200            for qf in queue_files:
201                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
202
203    def configure_adq(self):
204        self.adq_load_modules()
205        self.adq_configure_nic()
206
207    def adq_load_modules(self):
208        self.log.info("Modprobing ADQ-related Linux modules...")
209        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
210        for module in adq_module_deps:
211            try:
212                self.exec_cmd(["sudo", "modprobe", module])
213                self.log.info("%s loaded!" % module)
214            except CalledProcessError as e:
215                self.log.error("ERROR: failed to load module %s" % module)
216                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
217
218    def adq_configure_tc(self):
219        self.log.info("Configuring ADQ Traffic classes and filters...")
220
221        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
222        num_queues_tc1 = self.num_cores
223        port_param = "dst_port" if isinstance(self, Target) else "src_port"
224        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
225
226        for nic_ip in self.nic_ips:
227            nic_name = self.get_nic_name_by_ip(nic_ip)
228            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
229
230            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
231                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
232                                "queues", "%s@0" % num_queues_tc0,
233                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
234                                "hw", "1", "mode", "channel"]
235            self.log.info(" ".join(tc_qdisc_map_cmd))
236            self.exec_cmd(tc_qdisc_map_cmd)
237
238            time.sleep(5)
239            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
240            self.log.info(" ".join(tc_qdisc_ingress_cmd))
241            self.exec_cmd(tc_qdisc_ingress_cmd)
242
243            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
244            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
245                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
246
247            for port in nic_ports:
248                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
249                                 "protocol", "ip", "ingress", "prio", "1", "flower",
250                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
251                                 "skip_sw", "hw_tc", "1"]
252                self.log.info(" ".join(tc_filter_cmd))
253                self.exec_cmd(tc_filter_cmd)
254
255            # show tc configuration
256            self.log.info("Show tc configuration for %s NIC..." % nic_name)
257            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
258            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
259            self.log.info("%s" % tc_disk_out)
260            self.log.info("%s" % tc_filter_out)
261
262            # Ethtool coalesce settings must be applied after configuring traffic classes
263            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
264            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
265
266            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
267            xps_cmd = ["sudo", xps_script_path, nic_name]
268            self.log.info(xps_cmd)
269            self.exec_cmd(xps_cmd)
270
271    def reload_driver(self, driver, *modprobe_args):
272
273        try:
274            self.exec_cmd(["sudo", "rmmod", driver])
275            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
276        except CalledProcessError as e:
277            self.log.error("ERROR: failed to reload %s module!" % driver)
278            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
279
280    def adq_configure_nic(self):
281        self.log.info("Configuring NIC port settings for ADQ testing...")
282
283        # Reload the driver first, to make sure any previous settings are re-set.
284        self.reload_driver("ice")
285
286        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
287        for nic in nic_names:
288            self.log.info(nic)
289            try:
290                self.exec_cmd(["sudo", "ethtool", "-K", nic,
291                               "hw-tc-offload", "on"])  # Enable hardware TC offload
292                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
293                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
294                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
295                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
296                # but can be used with 4k block sizes as well
297                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
298                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
299            except subprocess.CalledProcessError as e:
300                self.log.error("ERROR: failed to configure NIC port using ethtool!")
301                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
302                self.log.info("Please update your NIC driver and firmware versions and try again.")
303            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
304            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
305
306    def configure_services(self):
307        self.log.info("Configuring active services...")
308        svc_config = configparser.ConfigParser(strict=False)
309
310        # Below list is valid only for RHEL / Fedora systems and might not
311        # contain valid names for other distributions.
312        svc_target_state = {
313            "firewalld": "inactive",
314            "irqbalance": "inactive",
315            "lldpad.service": "inactive",
316            "lldpad.socket": "inactive"
317        }
318
319        for service in svc_target_state:
320            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
321            out = "\n".join(["[%s]" % service, out])
322            svc_config.read_string(out)
323
324            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
325                continue
326
327            service_state = svc_config[service]["ActiveState"]
328            self.log.info("Current state of %s service is %s" % (service, service_state))
329            self.svc_restore_dict.update({service: service_state})
330            if service_state != "inactive":
331                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
332                self.exec_cmd(["sudo", "systemctl", "stop", service])
333
334    def configure_sysctl(self):
335        self.log.info("Tuning sysctl settings...")
336
337        busy_read = 0
338        busy_poll = 0
339        if self.enable_adq and self.mode == "spdk":
340            busy_read = 1
341            busy_poll = 1
342
343        sysctl_opts = {
344            "net.core.busy_poll": busy_poll,
345            "net.core.busy_read": busy_read,
346            "net.core.somaxconn": 4096,
347            "net.core.netdev_max_backlog": 8192,
348            "net.ipv4.tcp_max_syn_backlog": 16384,
349            "net.core.rmem_max": 268435456,
350            "net.core.wmem_max": 268435456,
351            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
352            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
353            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
354            "net.ipv4.route.flush": 1,
355            "vm.overcommit_memory": 1,
356            "net.core.rps_sock_flow_entries": 0
357        }
358
359        if self.enable_adq:
360            sysctl_opts.update(self.adq_set_busy_read(1))
361
362        if self.enable_arfs:
363            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
364
365        for opt, value in sysctl_opts.items():
366            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
367            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
368
369    def configure_tuned(self):
370        if not self.tuned_profile:
371            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
372            return
373
374        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
375        service = "tuned"
376        tuned_config = configparser.ConfigParser(strict=False)
377
378        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
379        out = "\n".join(["[%s]" % service, out])
380        tuned_config.read_string(out)
381        tuned_state = tuned_config[service]["ActiveState"]
382        self.svc_restore_dict.update({service: tuned_state})
383
384        if tuned_state != "inactive":
385            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
386            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
387
388            self.tuned_restore_dict = {
389                "profile": profile,
390                "mode": profile_mode
391            }
392
393        self.exec_cmd(["sudo", "systemctl", "start", service])
394        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
395        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
396
397    def configure_cpu_governor(self):
398        self.log.info("Setting CPU governor to performance...")
399
400        # This assumes that there is the same CPU scaling governor on each CPU
401        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
402        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
403
404    def get_core_list_from_mask(self, core_mask):
405        # Generate list of individual cores from hex core mask
406        # (e.g. '0xffff') or list containing:
407        # - individual cores (e.g. '1, 2, 3')
408        # - core ranges (e.g. '0-3')
409        # - mix of both (e.g. '0, 1-4, 9, 11-13')
410
411        core_list = []
412        if "0x" in core_mask:
413            core_mask_int = int(core_mask, 16)
414            for i in range(core_mask_int.bit_length()):
415                if (1 << i) & core_mask_int:
416                    core_list.append(i)
417            return core_list
418        else:
419            # Core list can be provided in .json config with square brackets
420            # remove them first
421            core_mask = core_mask.replace("[", "")
422            core_mask = core_mask.replace("]", "")
423
424            for i in core_mask.split(","):
425                if "-" in i:
426                    start, end = i.split("-")
427                    core_range = range(int(start), int(end) + 1)
428                    core_list.extend(core_range)
429                else:
430                    core_list.append(int(i))
431            return core_list
432
433    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
434        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
435
436        if mode not in ["default", "bynode", "cpulist"]:
437            raise ValueError("%s irq affinity setting not supported" % mode)
438
439        if mode == "cpulist" and not cpulist:
440            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
441
442        affinity_script = "set_irq_affinity.sh"
443        if "default" not in mode:
444            affinity_script = "set_irq_affinity_cpulist.sh"
445            system_cpu_map = self.get_numa_cpu_map()
446        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
447
448        def cpu_list_to_string(cpulist):
449            return ",".join(map(lambda x: str(x), cpulist))
450
451        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
452        for nic_name in nic_names:
453            irq_cmd = ["sudo", irq_script_path]
454
455            # Use only CPU cores matching NIC NUMA node.
456            # Remove any CPU cores if they're on exclusion list.
457            if mode == "bynode":
458                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
459                if cpulist and exclude_cpulist:
460                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
461                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
462                    if not irq_cpus:
463                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
464                irq_cmd.append(cpu_list_to_string(irq_cpus))
465
466            if mode == "cpulist":
467                irq_cpus = self.get_core_list_from_mask(cpulist)
468                if exclude_cpulist:
469                    # Flatten system CPU list, we don't need NUMA awareness here
470                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
471                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
472                    if not irq_cpus:
473                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
474                irq_cmd.append(cpu_list_to_string(irq_cpus))
475
476            irq_cmd.append(nic_name)
477            self.log.info(irq_cmd)
478            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
479
480    def restore_services(self):
481        self.log.info("Restoring services...")
482        for service, state in self.svc_restore_dict.items():
483            cmd = "stop" if state == "inactive" else "start"
484            self.exec_cmd(["sudo", "systemctl", cmd, service])
485
486    def restore_sysctl(self):
487        self.log.info("Restoring sysctl settings...")
488        for opt, value in self.sysctl_restore_dict.items():
489            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
490
491    def restore_tuned(self):
492        self.log.info("Restoring tuned-adm settings...")
493
494        if not self.tuned_restore_dict:
495            return
496
497        if self.tuned_restore_dict["mode"] == "auto":
498            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
499            self.log.info("Reverted tuned-adm to auto_profile.")
500        else:
501            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
502            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
503
504    def restore_governor(self):
505        self.log.info("Restoring CPU governor setting...")
506        if self.governor_restore:
507            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
508            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
509
510    def restore_settings(self):
511        self.restore_governor()
512        self.restore_tuned()
513        self.restore_services()
514        self.restore_sysctl()
515        if self.enable_adq:
516            self.reload_driver("ice")
517
518
519class Target(Server):
520    def __init__(self, name, general_config, target_config):
521        super().__init__(name, general_config, target_config)
522
523        # Defaults
524        self.enable_zcopy = False
525        self.scheduler_name = "static"
526        self.null_block = 0
527        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
528        self.subsystem_info_list = []
529        self.initiator_info = []
530        self.nvme_allowlist = []
531        self.nvme_blocklist = []
532
533        # Target-side measurement options
534        self.enable_pm = True
535        self.enable_sar = True
536        self.enable_pcm = True
537        self.enable_bw = True
538        self.enable_dpdk_memory = True
539
540        if "null_block_devices" in target_config:
541            self.null_block = target_config["null_block_devices"]
542        if "scheduler_settings" in target_config:
543            self.scheduler_name = target_config["scheduler_settings"]
544        if "zcopy_settings" in target_config:
545            self.enable_zcopy = target_config["zcopy_settings"]
546        if "results_dir" in target_config:
547            self.results_dir = target_config["results_dir"]
548        if "blocklist" in target_config:
549            self.nvme_blocklist = target_config["blocklist"]
550        if "allowlist" in target_config:
551            self.nvme_allowlist = target_config["allowlist"]
552            # Blocklist takes precedence, remove common elements from allowlist
553            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
554        if "enable_pm" in target_config:
555            self.enable_pm = target_config["enable_pm"]
556        if "enable_sar" in target_config:
557            self.enable_sar = target_config["enable_sar"]
558        if "enable_pcm" in target_config:
559            self.enable_pcm = target_config["enable_pcm"]
560        if "enable_bandwidth" in target_config:
561            self.enable_bw = target_config["enable_bandwidth"]
562        if "enable_dpdk_memory" in target_config:
563            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
564
565        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
566        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
567
568        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
569        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
570        self.set_local_nic_info(self.set_local_nic_info_helper())
571
572        if self.skip_spdk_install is False:
573            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
574
575        self.configure_system()
576        if self.enable_adq:
577            self.configure_adq()
578            self.configure_irq_affinity()
579        self.sys_config()
580
581    def set_local_nic_info_helper(self):
582        return json.loads(self.exec_cmd(["lshw", "-json"]))
583
584    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
585        stderr_opt = None
586        if stderr_redirect:
587            stderr_opt = subprocess.STDOUT
588        if change_dir:
589            old_cwd = os.getcwd()
590            os.chdir(change_dir)
591            self.log.info("Changing directory to %s" % change_dir)
592
593        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
594
595        if change_dir:
596            os.chdir(old_cwd)
597            self.log.info("Changing directory to %s" % old_cwd)
598        return out
599
600    def zip_spdk_sources(self, spdk_dir, dest_file):
601        self.log.info("Zipping SPDK source directory")
602        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
603        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
604            for file in files:
605                fh.write(os.path.relpath(os.path.join(root, file)))
606        fh.close()
607        self.log.info("Done zipping")
608
609    @staticmethod
610    def _chunks(input_list, chunks_no):
611        div, rem = divmod(len(input_list), chunks_no)
612        for i in range(chunks_no):
613            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
614            yield input_list[si:si + (div + 1 if i < rem else div)]
615
616    def spread_bdevs(self, req_disks):
617        # Spread available block devices indexes:
618        # - evenly across available initiator systems
619        # - evenly across available NIC interfaces for
620        #   each initiator
621        # Not NUMA aware.
622        ip_bdev_map = []
623        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
624
625        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
626            self.initiator_info[i]["bdev_range"] = init_chunk
627            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
628            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
629                for c in nic_chunk:
630                    ip_bdev_map.append((ip, c))
631        return ip_bdev_map
632
633    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
634        cpu_number = os.cpu_count()
635        sar_idle_sum = 0
636        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
637        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
638
639        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
640        time.sleep(ramp_time)
641        self.log.info("Starting SAR measurements")
642
643        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
644        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
645            for line in out.split("\n"):
646                if "Average" in line:
647                    if "CPU" in line:
648                        self.log.info("Summary CPU utilization from SAR:")
649                        self.log.info(line)
650                    elif "all" in line:
651                        self.log.info(line)
652                    else:
653                        sar_idle_sum += float(line.split()[7])
654            fh.write(out)
655        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
656
657        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
658            f.write("%0.2f" % sar_cpu_usage)
659
660    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
661        time.sleep(ramp_time)
662        self.log.info("Starting power measurements")
663        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
664                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
665                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
666
667    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
668        time.sleep(ramp_time)
669        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
670        pcm_memory = subprocess.Popen(cmd)
671        time.sleep(run_time)
672        pcm_memory.terminate()
673
674    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
675        time.sleep(ramp_time)
676        cmd = ["pcm", "1", "-i=%s" % run_time,
677               "-csv=%s/%s" % (results_dir, pcm_file_name)]
678        subprocess.run(cmd)
679        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
680        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
681        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
682        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
683        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
684
685    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
686        time.sleep(ramp_time)
687        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
688        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
689            fh.write(out)
690        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
691        #       Improve this so that additional file containing measurements average is generated too.
692
693    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
694        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
695        time.sleep(ramp_time)
696        self.log.info("INFO: starting network bandwidth measure")
697        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
698                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
699        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
700        #       Improve this so that additional file containing measurements average is generated too.
701        # TODO: Monitor only these interfaces which are currently used to run the workload.
702
703    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
704        self.log.info("INFO: waiting to generate DPDK memory usage")
705        time.sleep(ramp_time)
706        self.log.info("INFO: generating DPDK memory usage")
707        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
708        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
709
710    def sys_config(self):
711        self.log.info("====Kernel release:====")
712        self.log.info(os.uname().release)
713        self.log.info("====Kernel command line:====")
714        with open('/proc/cmdline') as f:
715            cmdline = f.readlines()
716            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
717        self.log.info("====sysctl conf:====")
718        with open('/etc/sysctl.conf') as f:
719            sysctl = f.readlines()
720            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
721        self.log.info("====Cpu power info:====")
722        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
723        self.log.info("====zcopy settings:====")
724        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
725        self.log.info("====Scheduler settings:====")
726        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
727
728
729class Initiator(Server):
730    def __init__(self, name, general_config, initiator_config):
731        super().__init__(name, general_config, initiator_config)
732
733        # Required fields, Defaults
734        config_fields = [
735            ConfigField(name='ip', required=True),
736            ConfigField(name='target_nic_ips', required=True),
737            ConfigField(name='cpus_allowed', default=None),
738            ConfigField(name='cpus_allowed_policy', default='shared'),
739            ConfigField(name='spdk_dir', default='/tmp/spdk'),
740            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
741            ConfigField(name='nvmecli_bin', default='nvme'),
742            ConfigField(name='cpu_frequency', default=None),
743            ConfigField(name='allow_cpu_sharing', default=True)
744        ]
745
746        self.read_config(config_fields, initiator_config)
747
748        if os.getenv('SPDK_WORKSPACE'):
749            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
750
751        self.subsystem_info_list = []
752
753        self.ssh_connection = paramiko.SSHClient()
754        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
755        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
756        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
757        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
758        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
759
760        if self.skip_spdk_install is False:
761            self.copy_spdk("/tmp/spdk.zip")
762
763        self.set_local_nic_info(self.set_local_nic_info_helper())
764        self.set_cpu_frequency()
765        self.configure_system()
766        if self.enable_adq:
767            self.configure_adq()
768        self.sys_config()
769
770    def set_local_nic_info_helper(self):
771        return json.loads(self.exec_cmd(["lshw", "-json"]))
772
773    def stop(self):
774        self.restore_settings()
775        self.ssh_connection.close()
776
777    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
778        if change_dir:
779            cmd = ["cd", change_dir, ";", *cmd]
780
781        # In case one of the command elements contains whitespace and is not
782        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
783        # when sending to remote system.
784        for i, c in enumerate(cmd):
785            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
786                cmd[i] = '"%s"' % c
787        cmd = " ".join(cmd)
788
789        # Redirect stderr to stdout thanks using get_pty option if needed
790        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
791        out = stdout.read().decode(encoding="utf-8")
792
793        # Check the return code
794        rc = stdout.channel.recv_exit_status()
795        if rc:
796            raise CalledProcessError(int(rc), cmd, out)
797
798        return out
799
800    def put_file(self, local, remote_dest):
801        ftp = self.ssh_connection.open_sftp()
802        ftp.put(local, remote_dest)
803        ftp.close()
804
805    def get_file(self, remote, local_dest):
806        ftp = self.ssh_connection.open_sftp()
807        ftp.get(remote, local_dest)
808        ftp.close()
809
810    def copy_spdk(self, local_spdk_zip):
811        self.log.info("Copying SPDK sources to initiator %s" % self.name)
812        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
813        self.log.info("Copied sources zip from target")
814        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
815        self.log.info("Sources unpacked")
816
817    def copy_result_files(self, dest_dir):
818        self.log.info("Copying results")
819
820        if not os.path.exists(dest_dir):
821            os.mkdir(dest_dir)
822
823        # Get list of result files from initiator and copy them back to target
824        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
825
826        for file in file_list:
827            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
828                          os.path.join(dest_dir, file))
829        self.log.info("Done copying results")
830
831    def match_subsystems(self, target_subsytems):
832        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
833        subsystems.sort(key=lambda x: x[1])
834        self.log.info("Found matching subsystems on target side:")
835        for s in subsystems:
836            self.log.info(s)
837        self.subsystem_info_list = subsystems
838
839    @abstractmethod
840    def init_connect(self):
841        pass
842
843    @abstractmethod
844    def init_disconnect(self):
845        pass
846
847    @abstractmethod
848    def gen_fio_filename_conf(self, *args, **kwargs):
849        # Logic implemented in SPDKInitiator and KernelInitiator classes
850        pass
851
852    def get_route_nic_numa(self, remote_nvme_ip):
853        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
854        local_nvme_nic = local_nvme_nic[0]["dev"]
855        return self.get_nic_numa_node(local_nvme_nic)
856
857    @staticmethod
858    def gen_fio_offset_section(offset_inc, num_jobs):
859        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
860        return "\n".join(["size=%s%%" % offset_inc,
861                          "offset=0%",
862                          "offset_increment=%s%%" % offset_inc])
863
864    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
865        numa_stats = {}
866        allowed_cpus = []
867        for nvme in fio_filenames_list:
868            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
869            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
870
871        # Use the most common NUMA node for this chunk to allocate memory and CPUs
872        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
873
874        # Check if we have enough free CPUs to pop from the list before assigning them
875        if len(self.available_cpus[section_local_numa]) < num_jobs:
876            if self.allow_cpu_sharing:
877                self.log.info("Regenerating available CPU list %s" % section_local_numa)
878                # Remove still available CPUs from the regenerated list. We don't want to
879                # regenerate it with duplicates.
880                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
881                self.available_cpus[section_local_numa].extend(cpus_regen)
882                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
883            else:
884                self.log.error("No more free CPU cores to use from allowed_cpus list!")
885                raise IndexError
886
887        for _ in range(num_jobs):
888            try:
889                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
890            except IndexError:
891                self.log.error("No more free CPU cores to use from allowed_cpus list!")
892                raise
893
894        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
895                          "numa_mem_policy=prefer:%s" % section_local_numa])
896
897    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
898                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
899                       offset=False, offset_inc=0):
900        fio_conf_template = """
901[global]
902ioengine={ioengine}
903{spdk_conf}
904thread=1
905group_reporting=1
906direct=1
907percentile_list=50:90:99:99.5:99.9:99.99:99.999
908
909norandommap=1
910rw={rw}
911rwmixread={rwmixread}
912bs={block_size}
913time_based=1
914ramp_time={ramp_time}
915runtime={run_time}
916rate_iops={rate_iops}
917"""
918
919        if self.cpus_allowed is not None:
920            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
921            cpus_num = 0
922            cpus = self.cpus_allowed.split(",")
923            for cpu in cpus:
924                if "-" in cpu:
925                    a, b = cpu.split("-")
926                    a = int(a)
927                    b = int(b)
928                    cpus_num += len(range(a, b))
929                else:
930                    cpus_num += 1
931            self.num_cores = cpus_num
932            threads = range(0, self.num_cores)
933        elif hasattr(self, 'num_cores'):
934            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
935            threads = range(0, int(self.num_cores))
936        else:
937            self.num_cores = len(self.subsystem_info_list)
938            threads = range(0, len(self.subsystem_info_list))
939
940        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
941                                                      offset, offset_inc)
942
943        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
944                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
945                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
946
947        # TODO: hipri disabled for now, as it causes fio errors:
948        # io_u error on file /dev/nvme2n1: Operation not supported
949        # See comment in KernelInitiator class, init_connect() function
950        if "io_uring" in self.ioengine:
951            fio_config = fio_config + """
952fixedbufs=1
953registerfiles=1
954#hipri=1
955"""
956        if num_jobs:
957            fio_config = fio_config + "numjobs=%s \n" % num_jobs
958        if self.cpus_allowed is not None:
959            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
960            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
961        fio_config = fio_config + filename_section
962
963        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
964        if hasattr(self, "num_cores"):
965            fio_config_filename += "_%sCPU" % self.num_cores
966        fio_config_filename += ".fio"
967
968        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
969        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
970        self.log.info("Created FIO Config:")
971        self.log.info(fio_config)
972
973        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
974
975    def set_cpu_frequency(self):
976        if self.cpu_frequency is not None:
977            try:
978                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
979                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
980                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
981            except Exception:
982                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
983                sys.exit()
984        else:
985            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
986
987    def run_fio(self, fio_config_file, run_num=1):
988        job_name, _ = os.path.splitext(fio_config_file)
989        self.log.info("Starting FIO run for job: %s" % job_name)
990        self.log.info("Using FIO: %s" % self.fio_bin)
991
992        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
993        try:
994            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
995                                    "--output=%s" % output_filename, "--eta=never"], True)
996            self.log.info(output)
997            self.log.info("FIO run finished. Results in: %s" % output_filename)
998        except subprocess.CalledProcessError as e:
999            self.log.error("ERROR: Fio process failed!")
1000            self.log.error(e.stdout)
1001
1002    def sys_config(self):
1003        self.log.info("====Kernel release:====")
1004        self.log.info(self.exec_cmd(["uname", "-r"]))
1005        self.log.info("====Kernel command line:====")
1006        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1007        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1008        self.log.info("====sysctl conf:====")
1009        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
1010        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1011        self.log.info("====Cpu power info:====")
1012        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
1013
1014
1015class KernelTarget(Target):
1016    def __init__(self, name, general_config, target_config):
1017        super().__init__(name, general_config, target_config)
1018        # Defaults
1019        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1020
1021    def load_drivers(self):
1022        self.log.info("Loading drivers")
1023        super().load_drivers()
1024        if self.null_block:
1025            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1026
1027    def configure_adq(self):
1028        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1029
1030    def adq_configure_tc(self):
1031        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1032
1033    def adq_set_busy_read(self, busy_read_val):
1034        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1035        return {"net.core.busy_read": 0}
1036
1037    def stop(self):
1038        self.nvmet_command(self.nvmet_bin, "clear")
1039        self.restore_settings()
1040
1041    def get_nvme_device_bdf(self, nvme_dev_path):
1042        nvme_name = os.path.basename(nvme_dev_path)
1043        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1044
1045    def get_nvme_devices(self):
1046        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1047        nvme_list = []
1048        for dev in dev_list:
1049            if "nvme" not in dev:
1050                continue
1051            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1052                continue
1053            if len(self.nvme_allowlist) == 0:
1054                nvme_list.append(dev)
1055                continue
1056            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1057                nvme_list.append(dev)
1058        return nvme_list
1059
1060    def nvmet_command(self, nvmet_bin, command):
1061        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1062
1063    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1064
1065        nvmet_cfg = {
1066            "ports": [],
1067            "hosts": [],
1068            "subsystems": [],
1069        }
1070
1071        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1072            port = str(4420 + bdev_num)
1073            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1074            serial = "SPDK00%s" % bdev_num
1075            bdev_name = nvme_list[bdev_num]
1076
1077            nvmet_cfg["subsystems"].append({
1078                "allowed_hosts": [],
1079                "attr": {
1080                    "allow_any_host": "1",
1081                    "serial": serial,
1082                    "version": "1.3"
1083                },
1084                "namespaces": [
1085                    {
1086                        "device": {
1087                            "path": bdev_name,
1088                            "uuid": "%s" % uuid.uuid4()
1089                        },
1090                        "enable": 1,
1091                        "nsid": port
1092                    }
1093                ],
1094                "nqn": nqn
1095            })
1096
1097            nvmet_cfg["ports"].append({
1098                "addr": {
1099                    "adrfam": "ipv4",
1100                    "traddr": ip,
1101                    "trsvcid": port,
1102                    "trtype": self.transport
1103                },
1104                "portid": bdev_num,
1105                "referrals": [],
1106                "subsystems": [nqn]
1107            })
1108
1109            self.subsystem_info_list.append((port, nqn, ip))
1110        self.subsys_no = len(self.subsystem_info_list)
1111
1112        with open("kernel.conf", "w") as fh:
1113            fh.write(json.dumps(nvmet_cfg, indent=2))
1114
1115    def tgt_start(self):
1116        self.log.info("Configuring kernel NVMeOF Target")
1117
1118        if self.null_block:
1119            self.log.info("Configuring with null block device.")
1120            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1121        else:
1122            self.log.info("Configuring with NVMe drives.")
1123            nvme_list = self.get_nvme_devices()
1124
1125        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1126        self.subsys_no = len(nvme_list)
1127
1128        self.nvmet_command(self.nvmet_bin, "clear")
1129        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1130
1131        if self.enable_adq:
1132            self.adq_configure_tc()
1133
1134        self.log.info("Done configuring kernel NVMeOF Target")
1135
1136
1137class SPDKTarget(Target):
1138    def __init__(self, name, general_config, target_config):
1139        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1140        # Method setting IRQ affinity is run as part of parent classes init,
1141        # so we need to have self.core_mask set before changing IRQ affinity.
1142        self.core_mask = target_config["core_mask"]
1143        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1144
1145        super().__init__(name, general_config, target_config)
1146
1147        # Format: property, default value
1148        config_fields = [
1149            ConfigField(name='dif_insert_strip', default=False),
1150            ConfigField(name='null_block_dif_type', default=0),
1151            ConfigField(name='num_shared_buffers', default=4096),
1152            ConfigField(name='max_queue_depth', default=128),
1153            ConfigField(name='bpf_scripts', default=[]),
1154            ConfigField(name='scheduler_core_limit', default=None),
1155            ConfigField(name='dsa_settings', default=False),
1156            ConfigField(name='iobuf_small_pool_count', default=32767),
1157            ConfigField(name='iobuf_large_pool_count', default=16383),
1158            ConfigField(name='num_cqe', default=4096)
1159        ]
1160
1161        self.read_config(config_fields, target_config)
1162
1163        self.bpf_proc = None
1164        self.enable_dsa = False
1165
1166        self.log.info("====DSA settings:====")
1167        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1168
1169    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1170        if mode not in ["default", "bynode", "cpulist",
1171                        "shared", "split", "split-bynode"]:
1172            self.log.error("%s irq affinity setting not supported" % mode)
1173            raise Exception
1174
1175        # Create core list from SPDK's mask and change it to string.
1176        # This is the type configure_irq_affinity expects for cpulist parameter.
1177        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1178        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1179        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1180
1181        if mode == "shared":
1182            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1183        elif mode == "split":
1184            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1185        elif mode == "split-bynode":
1186            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1187        else:
1188            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1189
1190    def adq_set_busy_read(self, busy_read_val):
1191        return {"net.core.busy_read": busy_read_val}
1192
1193    def get_nvme_devices_count(self):
1194        return len(self.get_nvme_devices())
1195
1196    def get_nvme_devices(self):
1197        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1198        bdev_bdfs = []
1199        for bdev in bdev_subsys_json_obj["config"]:
1200            bdev_traddr = bdev["params"]["traddr"]
1201            if bdev_traddr in self.nvme_blocklist:
1202                continue
1203            if len(self.nvme_allowlist) == 0:
1204                bdev_bdfs.append(bdev_traddr)
1205            if bdev_traddr in self.nvme_allowlist:
1206                bdev_bdfs.append(bdev_traddr)
1207        return bdev_bdfs
1208
1209    def spdk_tgt_configure(self):
1210        self.log.info("Configuring SPDK NVMeOF target via RPC")
1211
1212        # Create transport layer
1213        nvmf_transport_params = {
1214            "client": self.client,
1215            "trtype": self.transport,
1216            "num_shared_buffers": self.num_shared_buffers,
1217            "max_queue_depth": self.max_queue_depth,
1218            "dif_insert_or_strip": self.dif_insert_strip,
1219            "sock_priority": self.adq_priority,
1220            "num_cqe": self.num_cqe
1221        }
1222
1223        if self.enable_adq:
1224            nvmf_transport_params["acceptor_poll_rate"] = 10000
1225
1226        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1227        self.log.info("SPDK NVMeOF transport layer:")
1228        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1229
1230        if self.null_block:
1231            self.spdk_tgt_add_nullblock(self.null_block)
1232            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1233        else:
1234            self.spdk_tgt_add_nvme_conf()
1235            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1236
1237        if self.enable_adq:
1238            self.adq_configure_tc()
1239
1240        self.log.info("Done configuring SPDK NVMeOF Target")
1241
1242    def spdk_tgt_add_nullblock(self, null_block_count):
1243        md_size = 0
1244        block_size = 4096
1245        if self.null_block_dif_type != 0:
1246            md_size = 128
1247
1248        self.log.info("Adding null block bdevices to config via RPC")
1249        for i in range(null_block_count):
1250            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1251            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1252                                      dif_type=self.null_block_dif_type, md_size=md_size)
1253        self.log.info("SPDK Bdevs configuration:")
1254        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1255
1256    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1257        self.log.info("Adding NVMe bdevs to config via RPC")
1258
1259        bdfs = self.get_nvme_devices()
1260        bdfs = [b.replace(":", ".") for b in bdfs]
1261
1262        if req_num_disks:
1263            if req_num_disks > len(bdfs):
1264                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1265                sys.exit(1)
1266            else:
1267                bdfs = bdfs[0:req_num_disks]
1268
1269        for i, bdf in enumerate(bdfs):
1270            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1271
1272        self.log.info("SPDK Bdevs configuration:")
1273        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1274
1275    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1276        self.log.info("Adding subsystems to config")
1277        if not req_num_disks:
1278            req_num_disks = self.get_nvme_devices_count()
1279
1280        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1281            port = str(4420 + bdev_num)
1282            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1283            serial = "SPDK00%s" % bdev_num
1284            bdev_name = "Nvme%sn1" % bdev_num
1285
1286            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1287                                           allow_any_host=True, max_namespaces=8)
1288            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1289            for nqn_name in [nqn, "discovery"]:
1290                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1291                                                     nqn=nqn_name,
1292                                                     trtype=self.transport,
1293                                                     traddr=ip,
1294                                                     trsvcid=port,
1295                                                     adrfam="ipv4")
1296            self.subsystem_info_list.append((port, nqn, ip))
1297        self.subsys_no = len(self.subsystem_info_list)
1298
1299        self.log.info("SPDK NVMeOF subsystem configuration:")
1300        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1301
1302    def bpf_start(self):
1303        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1304        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1305        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1306        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1307
1308        with open(self.pid, "r") as fh:
1309            nvmf_pid = str(fh.readline())
1310
1311        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1312        self.log.info(cmd)
1313        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1314
1315    def tgt_start(self):
1316        if self.null_block:
1317            self.subsys_no = 1
1318        else:
1319            self.subsys_no = self.get_nvme_devices_count()
1320        self.log.info("Starting SPDK NVMeOF Target process")
1321        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1322        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1323        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1324
1325        with open(self.pid, "w") as fh:
1326            fh.write(str(proc.pid))
1327        self.nvmf_proc = proc
1328        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1329        self.log.info("Waiting for spdk to initialize...")
1330        while True:
1331            if os.path.exists("/var/tmp/spdk.sock"):
1332                break
1333            time.sleep(1)
1334        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1335
1336        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1337        rpc.iobuf.iobuf_set_options(self.client,
1338                                    small_pool_count=self.iobuf_small_pool_count,
1339                                    large_pool_count=self.iobuf_large_pool_count,
1340                                    small_bufsize=None,
1341                                    large_bufsize=None)
1342
1343        if self.enable_zcopy:
1344            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1345                                           enable_zerocopy_send_server=True)
1346            self.log.info("Target socket options:")
1347            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1348
1349        if self.enable_adq:
1350            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1351            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1352                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1353
1354        if self.enable_dsa:
1355            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1356            self.log.info("Target DSA accel module enabled")
1357
1358        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1359        rpc.framework_start_init(self.client)
1360
1361        if self.bpf_scripts:
1362            self.bpf_start()
1363
1364        self.spdk_tgt_configure()
1365
1366    def stop(self):
1367        if self.bpf_proc:
1368            self.log.info("Stopping BPF Trace script")
1369            self.bpf_proc.terminate()
1370            self.bpf_proc.wait()
1371
1372        if hasattr(self, "nvmf_proc"):
1373            try:
1374                self.nvmf_proc.terminate()
1375                self.nvmf_proc.wait(timeout=30)
1376            except Exception as e:
1377                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1378                self.log.info(e)
1379                self.nvmf_proc.kill()
1380                self.nvmf_proc.communicate()
1381                # Try to clean up RPC socket files if they were not removed
1382                # because of using 'kill'
1383                try:
1384                    os.remove("/var/tmp/spdk.sock")
1385                    os.remove("/var/tmp/spdk.sock.lock")
1386                except FileNotFoundError:
1387                    pass
1388        self.restore_settings()
1389
1390
1391class KernelInitiator(Initiator):
1392    def __init__(self, name, general_config, initiator_config):
1393        super().__init__(name, general_config, initiator_config)
1394
1395        # Defaults
1396        self.extra_params = initiator_config.get('extra_params', '')
1397
1398        self.ioengine = "libaio"
1399        self.spdk_conf = ""
1400
1401        if "num_cores" in initiator_config:
1402            self.num_cores = initiator_config["num_cores"]
1403
1404        if "kernel_engine" in initiator_config:
1405            self.ioengine = initiator_config["kernel_engine"]
1406            if "io_uring" in self.ioengine:
1407                self.extra_params += ' --nr-poll-queues=8'
1408
1409    def configure_adq(self):
1410        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1411
1412    def adq_configure_tc(self):
1413        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1414
1415    def adq_set_busy_read(self, busy_read_val):
1416        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1417        return {"net.core.busy_read": 0}
1418
1419    def get_connected_nvme_list(self):
1420        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1421        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1422                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1423        return nvme_list
1424
1425    def init_connect(self):
1426        self.log.info("Below connection attempts may result in error messages, this is expected!")
1427        for subsystem in self.subsystem_info_list:
1428            self.log.info("Trying to connect %s %s %s" % subsystem)
1429            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1430                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1431            time.sleep(2)
1432
1433        if "io_uring" in self.ioengine:
1434            self.log.info("Setting block layer settings for io_uring.")
1435
1436            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1437            #       apparently it's not possible for connected subsystems.
1438            #       Results in "error: Invalid argument"
1439            block_sysfs_settings = {
1440                "iostats": "0",
1441                "rq_affinity": "0",
1442                "nomerges": "2"
1443            }
1444
1445            for disk in self.get_connected_nvme_list():
1446                sysfs = os.path.join("/sys/block", disk, "queue")
1447                for k, v in block_sysfs_settings.items():
1448                    sysfs_opt_path = os.path.join(sysfs, k)
1449                    try:
1450                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1451                    except CalledProcessError as e:
1452                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1453                    finally:
1454                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1455                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1456
1457    def init_disconnect(self):
1458        for subsystem in self.subsystem_info_list:
1459            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1460            time.sleep(1)
1461
1462    def get_nvme_subsystem_numa(self, dev_name):
1463        # Remove two last characters to get controller name instead of subsystem name
1464        nvme_ctrl = os.path.basename(dev_name)[:-2]
1465        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1466                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1467        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1468
1469    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1470        self.available_cpus = self.get_numa_cpu_map()
1471        if len(threads) >= len(subsystems):
1472            threads = range(0, len(subsystems))
1473
1474        # Generate connected nvme devices names and sort them by used NIC numa node
1475        # to allow better grouping when splitting into fio sections.
1476        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1477        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1478        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1479
1480        filename_section = ""
1481        nvme_per_split = int(len(nvme_list) / len(threads))
1482        remainder = len(nvme_list) % len(threads)
1483        iterator = iter(nvme_list)
1484        result = []
1485        for i in range(len(threads)):
1486            result.append([])
1487            for _ in range(nvme_per_split):
1488                result[i].append(next(iterator))
1489                if remainder:
1490                    result[i].append(next(iterator))
1491                    remainder -= 1
1492        for i, r in enumerate(result):
1493            header = "[filename%s]" % i
1494            disks = "\n".join(["filename=%s" % x for x in r])
1495            job_section_qd = round((io_depth * len(r)) / num_jobs)
1496            if job_section_qd == 0:
1497                job_section_qd = 1
1498            iodepth = "iodepth=%s" % job_section_qd
1499
1500            offset_section = ""
1501            if offset:
1502                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1503
1504            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1505
1506            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1507
1508        return filename_section
1509
1510
1511class SPDKInitiator(Initiator):
1512    def __init__(self, name, general_config, initiator_config):
1513        super().__init__(name, general_config, initiator_config)
1514
1515        if self.skip_spdk_install is False:
1516            self.install_spdk()
1517
1518        # Optional fields
1519        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1520        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1521        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1522
1523        if "num_cores" in initiator_config:
1524            self.num_cores = initiator_config["num_cores"]
1525
1526        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1527        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1528
1529    def adq_set_busy_read(self, busy_read_val):
1530        return {"net.core.busy_read": busy_read_val}
1531
1532    def install_spdk(self):
1533        self.log.info("Using fio binary %s" % self.fio_bin)
1534        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1535        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1536        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1537                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1538                       "--enable-lto", "--disable-unit-tests"])
1539        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1540        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1541
1542        self.log.info("SPDK built")
1543        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1544
1545    def init_connect(self):
1546        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1547        # bdev plugin initiates connection just before starting IO traffic.
1548        # This is just to have a "init_connect" equivalent of the same function
1549        # from KernelInitiator class.
1550        # Just prepare bdev.conf JSON file for later use and consider it
1551        # "making a connection".
1552        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1553        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1554
1555    def init_disconnect(self):
1556        # SPDK Initiator does not need to explicity disconnect as this gets done
1557        # after fio bdev plugin finishes IO.
1558        return
1559
1560    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1561        spdk_cfg_section = {
1562            "subsystems": [
1563                {
1564                    "subsystem": "bdev",
1565                    "config": []
1566                },
1567                {
1568                    "subsystem": "iobuf",
1569                    "config": [
1570                        {
1571                            "method": "iobuf_set_options",
1572                            "params": {
1573                                "small_pool_count": self.small_pool_count,
1574                                "large_pool_count": self.large_pool_count
1575                            }
1576                        }
1577                    ]
1578                }
1579            ]
1580        }
1581
1582        for i, subsys in enumerate(remote_subsystem_list):
1583            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1584            nvme_ctrl = {
1585                "method": "bdev_nvme_attach_controller",
1586                "params": {
1587                    "name": "Nvme{}".format(i),
1588                    "trtype": self.transport,
1589                    "traddr": sub_addr,
1590                    "trsvcid": sub_port,
1591                    "subnqn": sub_nqn,
1592                    "adrfam": "IPv4"
1593                }
1594            }
1595
1596            if self.enable_adq:
1597                nvme_ctrl["params"].update({"priority": "1"})
1598
1599            if self.enable_data_digest:
1600                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1601
1602            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1603
1604        return json.dumps(spdk_cfg_section, indent=2)
1605
1606    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1607        self.available_cpus = self.get_numa_cpu_map()
1608        filename_section = ""
1609        if len(threads) >= len(subsystems):
1610            threads = range(0, len(subsystems))
1611
1612        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1613        # to allow better grouping when splitting into fio sections.
1614        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1615        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1616        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1617
1618        nvme_per_split = int(len(subsystems) / len(threads))
1619        remainder = len(subsystems) % len(threads)
1620        iterator = iter(filenames)
1621        result = []
1622        for i in range(len(threads)):
1623            result.append([])
1624            for _ in range(nvme_per_split):
1625                result[i].append(next(iterator))
1626            if remainder:
1627                result[i].append(next(iterator))
1628                remainder -= 1
1629        for i, r in enumerate(result):
1630            header = "[filename%s]" % i
1631            disks = "\n".join(["filename=%s" % x for x in r])
1632            job_section_qd = round((io_depth * len(r)) / num_jobs)
1633            if job_section_qd == 0:
1634                job_section_qd = 1
1635            iodepth = "iodepth=%s" % job_section_qd
1636
1637            offset_section = ""
1638            if offset:
1639                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1640
1641            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1642
1643            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1644
1645        return filename_section
1646
1647    def get_nvme_subsystem_numa(self, bdev_name):
1648        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1649        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1650
1651        # Remove two last characters to get controller name instead of subsystem name
1652        nvme_ctrl = bdev_name[:-2]
1653        for bdev in bdev_conf_json_obj:
1654            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1655                return self.get_route_nic_numa(bdev["params"]["traddr"])
1656        return None
1657
1658
1659if __name__ == "__main__":
1660    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1661    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1662
1663    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1664    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1665                        help='Configuration file.')
1666    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1667                        help='Results directory.')
1668    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1669                        help='CSV results filename.')
1670    parser.add_argument('-f', '--force', default=False, action='store_true',
1671                        dest='force', help="""Force script to continue and try to use all
1672                        available NVMe devices during test.
1673                        WARNING: Might result in data loss on used NVMe drives""")
1674
1675    args = parser.parse_args()
1676
1677    logging.basicConfig(level=logging.INFO,
1678                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1679
1680    logging.info("Using config file: %s" % args.config)
1681    with open(args.config, "r") as config:
1682        data = json.load(config)
1683
1684    initiators = []
1685    fio_cases = []
1686
1687    general_config = data["general"]
1688    target_config = data["target"]
1689    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1690
1691    if "null_block_devices" not in data["target"] and \
1692        (args.force is False and
1693            "allowlist" not in data["target"] and
1694            "blocklist" not in data["target"]):
1695        # TODO: Also check if allowlist or blocklist are not empty.
1696        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1697        You can choose to use all available NVMe drives on your system, which may potentially
1698        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1699        exit(1)
1700
1701    for k, v in data.items():
1702        if "target" in k:
1703            v.update({"results_dir": args.results})
1704            if data[k]["mode"] == "spdk":
1705                target_obj = SPDKTarget(k, data["general"], v)
1706            elif data[k]["mode"] == "kernel":
1707                target_obj = KernelTarget(k, data["general"], v)
1708        elif "initiator" in k:
1709            if data[k]["mode"] == "spdk":
1710                init_obj = SPDKInitiator(k, data["general"], v)
1711            elif data[k]["mode"] == "kernel":
1712                init_obj = KernelInitiator(k, data["general"], v)
1713            initiators.append(init_obj)
1714        elif "fio" in k:
1715            fio_workloads = itertools.product(data[k]["bs"],
1716                                              data[k]["qd"],
1717                                              data[k]["rw"])
1718
1719            fio_run_time = data[k]["run_time"]
1720            fio_ramp_time = data[k]["ramp_time"]
1721            fio_rw_mix_read = data[k]["rwmixread"]
1722            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1723            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1724
1725            fio_rate_iops = 0
1726            if "rate_iops" in data[k]:
1727                fio_rate_iops = data[k]["rate_iops"]
1728
1729            fio_offset = False
1730            if "offset" in data[k]:
1731                fio_offset = data[k]["offset"]
1732            fio_offset_inc = 0
1733            if "offset_inc" in data[k]:
1734                fio_offset_inc = data[k]["offset_inc"]
1735        else:
1736            continue
1737
1738    try:
1739        os.mkdir(args.results)
1740    except FileExistsError:
1741        pass
1742
1743    for i in initiators:
1744        target_obj.initiator_info.append(
1745            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1746        )
1747
1748    # TODO: This try block is definietly too large. Need to break this up into separate
1749    # logical blocks to reduce size.
1750    try:
1751        target_obj.tgt_start()
1752
1753        for i in initiators:
1754            i.match_subsystems(target_obj.subsystem_info_list)
1755            if i.enable_adq:
1756                i.adq_configure_tc()
1757
1758        # Poor mans threading
1759        # Run FIO tests
1760        for block_size, io_depth, rw in fio_workloads:
1761            configs = []
1762            for i in initiators:
1763                i.init_connect()
1764                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1765                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1766                                       fio_offset, fio_offset_inc)
1767                configs.append(cfg)
1768
1769            for run_no in range(1, fio_run_num+1):
1770                threads = []
1771                power_daemon = None
1772                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1773
1774                for i, cfg in zip(initiators, configs):
1775                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1776                    threads.append(t)
1777                if target_obj.enable_sar:
1778                    sar_file_prefix = measurements_prefix + "_sar"
1779                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1780                    threads.append(t)
1781
1782                if target_obj.enable_pcm:
1783                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1784
1785                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1786                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1787                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1788                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1789                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1790                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1791
1792                    threads.append(pcm_cpu_t)
1793                    threads.append(pcm_mem_t)
1794                    threads.append(pcm_pow_t)
1795
1796                if target_obj.enable_bw:
1797                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1798                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1799                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1800                    threads.append(t)
1801
1802                if target_obj.enable_dpdk_memory:
1803                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1804                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1805                    threads.append(t)
1806
1807                if target_obj.enable_pm:
1808                    power_daemon = threading.Thread(target=target_obj.measure_power,
1809                                                    args=(args.results, measurements_prefix, script_full_dir,
1810                                                          fio_ramp_time, fio_run_time))
1811                    threads.append(power_daemon)
1812
1813                for t in threads:
1814                    t.start()
1815                for t in threads:
1816                    t.join()
1817
1818            for i in initiators:
1819                i.init_disconnect()
1820                i.copy_result_files(args.results)
1821        try:
1822            parse_results(args.results, args.csv_filename)
1823        except Exception as err:
1824            logging.error("There was an error with parsing the results")
1825            logging.error(err)
1826    finally:
1827        for i in initiators:
1828            try:
1829                i.stop()
1830            except Exception as err:
1831                pass
1832        target_obj.stop()
1833