xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 0bbe3cb0efbf44621e4f2b677446a390787ee83f)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False)
57        ]
58        self.read_config(config_fields, general_config)
59        self.transport = self.transport.lower()
60
61        config_fields = [
62            ConfigField(name='nic_ips', required=True),
63            ConfigField(name='mode', required=True),
64            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
65            ConfigField(name='enable_arfs', default=False),
66            ConfigField(name='tuned_profile', default='')
67        ]
68        self.read_config(config_fields, server_config)
69
70        self.local_nic_info = []
71        self._nics_json_obj = {}
72        self.svc_restore_dict = {}
73        self.sysctl_restore_dict = {}
74        self.tuned_restore_dict = {}
75        self.governor_restore = ""
76
77        self.enable_adq = server_config.get('adq_enable', False)
78        self.adq_priority = 1 if self.enable_adq else None
79        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
80
81        if not re.match(r'^[A-Za-z0-9\-]+$', name):
82            self.log.info("Please use a name which contains only letters, numbers or dashes")
83            sys.exit(1)
84
85    def read_config(self, config_fields, config):
86        for config_field in config_fields:
87            value = config.get(config_field.name, config_field.default)
88            if config_field.required is True and value is None:
89                raise ValueError('%s config key is required' % config_field.name)
90
91            setattr(self, config_field.name, value)
92
93    @staticmethod
94    def get_uncommented_lines(lines):
95        return [line for line in lines if line and not line.startswith('#')]
96
97    def get_nic_name_by_ip(self, ip):
98        if not self._nics_json_obj:
99            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
100            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
101        for nic in self._nics_json_obj:
102            for addr in nic["addr_info"]:
103                if ip in addr["local"]:
104                    return nic["ifname"]
105
106    @abstractmethod
107    def set_local_nic_info_helper(self):
108        pass
109
110    def set_local_nic_info(self, pci_info):
111        def extract_network_elements(json_obj):
112            nic_list = []
113            if isinstance(json_obj, list):
114                for x in json_obj:
115                    nic_list.extend(extract_network_elements(x))
116            elif isinstance(json_obj, dict):
117                if "children" in json_obj:
118                    nic_list.extend(extract_network_elements(json_obj["children"]))
119                if "class" in json_obj.keys() and "network" in json_obj["class"]:
120                    nic_list.append(json_obj)
121            return nic_list
122
123        self.local_nic_info = extract_network_elements(pci_info)
124
125    def get_nic_numa_node(self, nic_name):
126        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
127
128    def get_numa_cpu_map(self):
129        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
130        numa_cpu_json_map = {}
131
132        for cpu in numa_cpu_json_obj["cpus"]:
133            cpu_num = int(cpu["cpu"])
134            numa_node = int(cpu["node"])
135            numa_cpu_json_map.setdefault(numa_node, [])
136            numa_cpu_json_map[numa_node].append(cpu_num)
137
138        return numa_cpu_json_map
139
140    # pylint: disable=R0201
141    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
142        return ""
143
144    def configure_system(self):
145        self.load_drivers()
146        self.configure_services()
147        self.configure_sysctl()
148        self.configure_arfs()
149        self.configure_tuned()
150        self.configure_cpu_governor()
151        self.configure_irq_affinity(**self.irq_settings)
152
153    RDMA_PROTOCOL_IWARP = 0
154    RDMA_PROTOCOL_ROCE = 1
155    RDMA_PROTOCOL_UNKNOWN = -1
156
157    def check_rdma_protocol(self):
158        try:
159            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
160            roce_ena = roce_ena.strip()
161            if roce_ena == "0":
162                return self.RDMA_PROTOCOL_IWARP
163            else:
164                return self.RDMA_PROTOCOL_ROCE
165        except CalledProcessError as e:
166            self.log.error("ERROR: failed to check RDMA protocol!")
167            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
168            return self.RDMA_PROTOCOL_UNKNOWN
169
170    def load_drivers(self):
171        self.log.info("Loading drivers")
172        self.exec_cmd(["sudo", "modprobe", "-a",
173                       "nvme-%s" % self.transport,
174                       "nvmet-%s" % self.transport])
175        current_mode = self.check_rdma_protocol()
176        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
177            self.log.error("ERROR: failed to check RDMA protocol mode")
178            return
179        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
180            self.reload_driver("irdma", "roce_ena=1")
181            self.log.info("Loaded irdma driver with RoCE enabled")
182        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
183            self.log.info("Leaving irdma driver with RoCE enabled")
184        else:
185            self.reload_driver("irdma", "roce_ena=0")
186            self.log.info("Loaded irdma driver with iWARP enabled")
187
188    def configure_arfs(self):
189        rps_flow_cnt = 512
190        if not self.enable_arfs:
191            rps_flow_cnt = 0
192
193        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
194        for nic_name in nic_names:
195            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
196            self.log.info(f"Setting rps_flow_cnt for {nic_name}")
197            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
198            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
199
200            for qf in queue_files:
201                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
202
203    def configure_adq(self):
204        self.adq_load_modules()
205        self.adq_configure_nic()
206
207    def adq_load_modules(self):
208        self.log.info("Modprobing ADQ-related Linux modules...")
209        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
210        for module in adq_module_deps:
211            try:
212                self.exec_cmd(["sudo", "modprobe", module])
213                self.log.info("%s loaded!" % module)
214            except CalledProcessError as e:
215                self.log.error("ERROR: failed to load module %s" % module)
216                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
217
218    def adq_configure_tc(self):
219        self.log.info("Configuring ADQ Traffic classes and filters...")
220
221        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
222        num_queues_tc1 = self.num_cores
223        port_param = "dst_port" if isinstance(self, Target) else "src_port"
224        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
225
226        for nic_ip in self.nic_ips:
227            nic_name = self.get_nic_name_by_ip(nic_ip)
228            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
229
230            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
231                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
232                                "queues", "%s@0" % num_queues_tc0,
233                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
234                                "hw", "1", "mode", "channel"]
235            self.log.info(" ".join(tc_qdisc_map_cmd))
236            self.exec_cmd(tc_qdisc_map_cmd)
237
238            time.sleep(5)
239            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
240            self.log.info(" ".join(tc_qdisc_ingress_cmd))
241            self.exec_cmd(tc_qdisc_ingress_cmd)
242
243            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
244            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
245                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
246
247            for port in nic_ports:
248                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
249                                 "protocol", "ip", "ingress", "prio", "1", "flower",
250                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
251                                 "skip_sw", "hw_tc", "1"]
252                self.log.info(" ".join(tc_filter_cmd))
253                self.exec_cmd(tc_filter_cmd)
254
255            # show tc configuration
256            self.log.info("Show tc configuration for %s NIC..." % nic_name)
257            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
258            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
259            self.log.info("%s" % tc_disk_out)
260            self.log.info("%s" % tc_filter_out)
261
262            # Ethtool coalesce settings must be applied after configuring traffic classes
263            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
264            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
265
266            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
267            xps_cmd = ["sudo", xps_script_path, nic_name]
268            self.log.info(xps_cmd)
269            self.exec_cmd(xps_cmd)
270
271    def reload_driver(self, driver, *modprobe_args):
272
273        try:
274            self.exec_cmd(["sudo", "rmmod", driver])
275            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
276        except CalledProcessError as e:
277            self.log.error("ERROR: failed to reload %s module!" % driver)
278            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
279
280    def adq_configure_nic(self):
281        self.log.info("Configuring NIC port settings for ADQ testing...")
282
283        # Reload the driver first, to make sure any previous settings are re-set.
284        self.reload_driver("ice")
285
286        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
287        for nic in nic_names:
288            self.log.info(nic)
289            try:
290                self.exec_cmd(["sudo", "ethtool", "-K", nic,
291                               "hw-tc-offload", "on"])  # Enable hardware TC offload
292                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
293                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
294                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
295                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
296                # but can be used with 4k block sizes as well
297                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
298                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
299            except subprocess.CalledProcessError as e:
300                self.log.error("ERROR: failed to configure NIC port using ethtool!")
301                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
302                self.log.info("Please update your NIC driver and firmware versions and try again.")
303            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
304            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
305
306    def configure_services(self):
307        self.log.info("Configuring active services...")
308        svc_config = configparser.ConfigParser(strict=False)
309
310        # Below list is valid only for RHEL / Fedora systems and might not
311        # contain valid names for other distributions.
312        svc_target_state = {
313            "firewalld": "inactive",
314            "irqbalance": "inactive",
315            "lldpad.service": "inactive",
316            "lldpad.socket": "inactive"
317        }
318
319        for service in svc_target_state:
320            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
321            out = "\n".join(["[%s]" % service, out])
322            svc_config.read_string(out)
323
324            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
325                continue
326
327            service_state = svc_config[service]["ActiveState"]
328            self.log.info("Current state of %s service is %s" % (service, service_state))
329            self.svc_restore_dict.update({service: service_state})
330            if service_state != "inactive":
331                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
332                self.exec_cmd(["sudo", "systemctl", "stop", service])
333
334    def configure_sysctl(self):
335        self.log.info("Tuning sysctl settings...")
336
337        busy_read = 0
338        busy_poll = 0
339        if self.enable_adq and self.mode == "spdk":
340            busy_read = 1
341            busy_poll = 1
342
343        sysctl_opts = {
344            "net.core.busy_poll": busy_poll,
345            "net.core.busy_read": busy_read,
346            "net.core.somaxconn": 4096,
347            "net.core.netdev_max_backlog": 8192,
348            "net.ipv4.tcp_max_syn_backlog": 16384,
349            "net.core.rmem_max": 268435456,
350            "net.core.wmem_max": 268435456,
351            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
352            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
353            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
354            "net.ipv4.route.flush": 1,
355            "vm.overcommit_memory": 1,
356            "net.core.rps_sock_flow_entries": 0
357        }
358
359        if self.enable_adq:
360            sysctl_opts.update(self.adq_set_busy_read(1))
361
362        if self.enable_arfs:
363            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
364
365        for opt, value in sysctl_opts.items():
366            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
367            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
368
369    def configure_tuned(self):
370        if not self.tuned_profile:
371            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
372            return
373
374        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
375        service = "tuned"
376        tuned_config = configparser.ConfigParser(strict=False)
377
378        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
379        out = "\n".join(["[%s]" % service, out])
380        tuned_config.read_string(out)
381        tuned_state = tuned_config[service]["ActiveState"]
382        self.svc_restore_dict.update({service: tuned_state})
383
384        if tuned_state != "inactive":
385            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
386            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
387
388            self.tuned_restore_dict = {
389                "profile": profile,
390                "mode": profile_mode
391            }
392
393        self.exec_cmd(["sudo", "systemctl", "start", service])
394        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
395        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
396
397    def configure_cpu_governor(self):
398        self.log.info("Setting CPU governor to performance...")
399
400        # This assumes that there is the same CPU scaling governor on each CPU
401        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
402        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
403
404    def get_core_list_from_mask(self, core_mask):
405        # Generate list of individual cores from hex core mask
406        # (e.g. '0xffff') or list containing:
407        # - individual cores (e.g. '1, 2, 3')
408        # - core ranges (e.g. '0-3')
409        # - mix of both (e.g. '0, 1-4, 9, 11-13')
410
411        core_list = []
412        if "0x" in core_mask:
413            core_mask_int = int(core_mask, 16)
414            for i in range(core_mask_int.bit_length()):
415                if (1 << i) & core_mask_int:
416                    core_list.append(i)
417            return core_list
418        else:
419            # Core list can be provided in .json config with square brackets
420            # remove them first
421            core_mask = core_mask.replace("[", "")
422            core_mask = core_mask.replace("]", "")
423
424            for i in core_mask.split(","):
425                if "-" in i:
426                    start, end = i.split("-")
427                    core_range = range(int(start), int(end) + 1)
428                    core_list.extend(core_range)
429                else:
430                    core_list.append(int(i))
431            return core_list
432
433    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
434        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
435
436        if mode not in ["default", "bynode", "cpulist"]:
437            raise ValueError("%s irq affinity setting not supported" % mode)
438
439        if mode == "cpulist" and not cpulist:
440            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
441
442        affinity_script = "set_irq_affinity.sh"
443        if "default" not in mode:
444            affinity_script = "set_irq_affinity_cpulist.sh"
445            system_cpu_map = self.get_numa_cpu_map()
446        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
447
448        def cpu_list_to_string(cpulist):
449            return ",".join(map(lambda x: str(x), cpulist))
450
451        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
452        for nic_name in nic_names:
453            irq_cmd = ["sudo", irq_script_path]
454
455            # Use only CPU cores matching NIC NUMA node.
456            # Remove any CPU cores if they're on exclusion list.
457            if mode == "bynode":
458                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
459                if cpulist and exclude_cpulist:
460                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
461                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
462                    if not irq_cpus:
463                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
464                irq_cmd.append(cpu_list_to_string(irq_cpus))
465
466            if mode == "cpulist":
467                irq_cpus = self.get_core_list_from_mask(cpulist)
468                if exclude_cpulist:
469                    # Flatten system CPU list, we don't need NUMA awareness here
470                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
471                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
472                    if not irq_cpus:
473                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
474                irq_cmd.append(cpu_list_to_string(irq_cpus))
475
476            irq_cmd.append(nic_name)
477            self.log.info(irq_cmd)
478            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
479
480    def restore_services(self):
481        self.log.info("Restoring services...")
482        for service, state in self.svc_restore_dict.items():
483            cmd = "stop" if state == "inactive" else "start"
484            self.exec_cmd(["sudo", "systemctl", cmd, service])
485
486    def restore_sysctl(self):
487        self.log.info("Restoring sysctl settings...")
488        for opt, value in self.sysctl_restore_dict.items():
489            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
490
491    def restore_tuned(self):
492        self.log.info("Restoring tuned-adm settings...")
493
494        if not self.tuned_restore_dict:
495            return
496
497        if self.tuned_restore_dict["mode"] == "auto":
498            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
499            self.log.info("Reverted tuned-adm to auto_profile.")
500        else:
501            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
502            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
503
504    def restore_governor(self):
505        self.log.info("Restoring CPU governor setting...")
506        if self.governor_restore:
507            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
508            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
509
510    def restore_settings(self):
511        self.restore_governor()
512        self.restore_tuned()
513        self.restore_services()
514        self.restore_sysctl()
515        if self.enable_adq:
516            self.reload_driver("ice")
517
518
519class Target(Server):
520    def __init__(self, name, general_config, target_config):
521        super().__init__(name, general_config, target_config)
522
523        # Defaults
524        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
525        self.subsystem_info_list = []
526        self.initiator_info = []
527
528        config_fields = [
529            ConfigField(name='mode', required=True),
530            ConfigField(name='results_dir', required=True),
531            ConfigField(name='enable_pm', default=True),
532            ConfigField(name='enable_sar', default=True),
533            ConfigField(name='enable_pcm', default=True),
534            ConfigField(name='enable_dpdk_memory', default=True)
535        ]
536        self.read_config(config_fields, target_config)
537
538        self.null_block = target_config.get('null_block_devices', 0)
539        self.scheduler_name = target_config.get('scheduler_settings', 'static')
540        self.enable_zcopy = target_config.get('zcopy_settings', False)
541        self.enable_bw = target_config.get('enable_bandwidth', True)
542        self.nvme_blocklist = target_config.get('blocklist', [])
543        self.nvme_allowlist = target_config.get('allowlist', [])
544
545        # Blocklist takes precedence, remove common elements from allowlist
546        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
547
548        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
549        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
550
551        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
552        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
553        self.set_local_nic_info(self.set_local_nic_info_helper())
554
555        if self.skip_spdk_install is False:
556            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
557
558        self.configure_system()
559        if self.enable_adq:
560            self.configure_adq()
561            self.configure_irq_affinity()
562        self.sys_config()
563
564    def set_local_nic_info_helper(self):
565        return json.loads(self.exec_cmd(["lshw", "-json"]))
566
567    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
568        stderr_opt = None
569        if stderr_redirect:
570            stderr_opt = subprocess.STDOUT
571        if change_dir:
572            old_cwd = os.getcwd()
573            os.chdir(change_dir)
574            self.log.info("Changing directory to %s" % change_dir)
575
576        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
577
578        if change_dir:
579            os.chdir(old_cwd)
580            self.log.info("Changing directory to %s" % old_cwd)
581        return out
582
583    def zip_spdk_sources(self, spdk_dir, dest_file):
584        self.log.info("Zipping SPDK source directory")
585        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
586        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
587            for file in files:
588                fh.write(os.path.relpath(os.path.join(root, file)))
589        fh.close()
590        self.log.info("Done zipping")
591
592    @staticmethod
593    def _chunks(input_list, chunks_no):
594        div, rem = divmod(len(input_list), chunks_no)
595        for i in range(chunks_no):
596            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
597            yield input_list[si:si + (div + 1 if i < rem else div)]
598
599    def spread_bdevs(self, req_disks):
600        # Spread available block devices indexes:
601        # - evenly across available initiator systems
602        # - evenly across available NIC interfaces for
603        #   each initiator
604        # Not NUMA aware.
605        ip_bdev_map = []
606        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
607
608        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
609            self.initiator_info[i]["bdev_range"] = init_chunk
610            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
611            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
612                for c in nic_chunk:
613                    ip_bdev_map.append((ip, c))
614        return ip_bdev_map
615
616    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
617        cpu_number = os.cpu_count()
618        sar_idle_sum = 0
619        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
620        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
621
622        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
623        time.sleep(ramp_time)
624        self.log.info("Starting SAR measurements")
625
626        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
627        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
628            for line in out.split("\n"):
629                if "Average" in line:
630                    if "CPU" in line:
631                        self.log.info("Summary CPU utilization from SAR:")
632                        self.log.info(line)
633                    elif "all" in line:
634                        self.log.info(line)
635                    else:
636                        sar_idle_sum += float(line.split()[7])
637            fh.write(out)
638        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
639
640        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
641            f.write("%0.2f" % sar_cpu_usage)
642
643    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
644        time.sleep(ramp_time)
645        self.log.info("Starting power measurements")
646        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
647                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
648                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
649
650    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
651        time.sleep(ramp_time)
652        cmd = ["pcm", "1", "-i=%s" % run_time,
653               "-csv=%s/%s" % (results_dir, pcm_file_name)]
654        subprocess.run(cmd)
655        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
656        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
657        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
658        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
659        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
660
661    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
662        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
663        time.sleep(ramp_time)
664        self.log.info("INFO: starting network bandwidth measure")
665        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
666                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
667        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
668        #       Improve this so that additional file containing measurements average is generated too.
669        # TODO: Monitor only these interfaces which are currently used to run the workload.
670
671    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
672        self.log.info("INFO: waiting to generate DPDK memory usage")
673        time.sleep(ramp_time)
674        self.log.info("INFO: generating DPDK memory usage")
675        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
676        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
677
678    def sys_config(self):
679        self.log.info("====Kernel release:====")
680        self.log.info(os.uname().release)
681        self.log.info("====Kernel command line:====")
682        with open('/proc/cmdline') as f:
683            cmdline = f.readlines()
684            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
685        self.log.info("====sysctl conf:====")
686        with open('/etc/sysctl.conf') as f:
687            sysctl = f.readlines()
688            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
689        self.log.info("====Cpu power info:====")
690        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
691        self.log.info("====zcopy settings:====")
692        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
693        self.log.info("====Scheduler settings:====")
694        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
695
696
697class Initiator(Server):
698    def __init__(self, name, general_config, initiator_config):
699        super().__init__(name, general_config, initiator_config)
700
701        # required fields and defaults
702        config_fields = [
703            ConfigField(name='mode', required=True),
704            ConfigField(name='ip', required=True),
705            ConfigField(name='target_nic_ips', required=True),
706            ConfigField(name='cpus_allowed', default=None),
707            ConfigField(name='cpus_allowed_policy', default='shared'),
708            ConfigField(name='spdk_dir', default='/tmp/spdk'),
709            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
710            ConfigField(name='nvmecli_bin', default='nvme'),
711            ConfigField(name='cpu_frequency', default=None),
712            ConfigField(name='allow_cpu_sharing', default=True)
713        ]
714
715        self.read_config(config_fields, initiator_config)
716
717        if os.getenv('SPDK_WORKSPACE'):
718            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
719
720        self.subsystem_info_list = []
721
722        self.ssh_connection = paramiko.SSHClient()
723        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
724        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
725        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
726        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
727        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
728
729        if self.skip_spdk_install is False:
730            self.copy_spdk("/tmp/spdk.zip")
731
732        self.set_local_nic_info(self.set_local_nic_info_helper())
733        self.set_cpu_frequency()
734        self.configure_system()
735        if self.enable_adq:
736            self.configure_adq()
737        self.sys_config()
738
739    def set_local_nic_info_helper(self):
740        return json.loads(self.exec_cmd(["lshw", "-json"]))
741
742    def stop(self):
743        self.restore_settings()
744        self.ssh_connection.close()
745
746    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
747        if change_dir:
748            cmd = ["cd", change_dir, ";", *cmd]
749
750        # In case one of the command elements contains whitespace and is not
751        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
752        # when sending to remote system.
753        for i, c in enumerate(cmd):
754            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
755                cmd[i] = '"%s"' % c
756        cmd = " ".join(cmd)
757
758        # Redirect stderr to stdout thanks using get_pty option if needed
759        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
760        out = stdout.read().decode(encoding="utf-8")
761
762        # Check the return code
763        rc = stdout.channel.recv_exit_status()
764        if rc:
765            raise CalledProcessError(int(rc), cmd, out)
766
767        return out
768
769    def put_file(self, local, remote_dest):
770        ftp = self.ssh_connection.open_sftp()
771        ftp.put(local, remote_dest)
772        ftp.close()
773
774    def get_file(self, remote, local_dest):
775        ftp = self.ssh_connection.open_sftp()
776        ftp.get(remote, local_dest)
777        ftp.close()
778
779    def copy_spdk(self, local_spdk_zip):
780        self.log.info("Copying SPDK sources to initiator %s" % self.name)
781        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
782        self.log.info("Copied sources zip from target")
783        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
784        self.log.info("Sources unpacked")
785
786    def copy_result_files(self, dest_dir):
787        self.log.info("Copying results")
788
789        if not os.path.exists(dest_dir):
790            os.mkdir(dest_dir)
791
792        # Get list of result files from initiator and copy them back to target
793        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
794
795        for file in file_list:
796            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
797                          os.path.join(dest_dir, file))
798        self.log.info("Done copying results")
799
800    def match_subsystems(self, target_subsytems):
801        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
802        subsystems.sort(key=lambda x: x[1])
803        self.log.info("Found matching subsystems on target side:")
804        for s in subsystems:
805            self.log.info(s)
806        self.subsystem_info_list = subsystems
807
808    @abstractmethod
809    def init_connect(self):
810        pass
811
812    @abstractmethod
813    def init_disconnect(self):
814        pass
815
816    @abstractmethod
817    def gen_fio_filename_conf(self, *args, **kwargs):
818        # Logic implemented in SPDKInitiator and KernelInitiator classes
819        pass
820
821    def get_route_nic_numa(self, remote_nvme_ip):
822        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
823        local_nvme_nic = local_nvme_nic[0]["dev"]
824        return self.get_nic_numa_node(local_nvme_nic)
825
826    @staticmethod
827    def gen_fio_offset_section(offset_inc, num_jobs):
828        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
829        return "\n".join(["size=%s%%" % offset_inc,
830                          "offset=0%",
831                          "offset_increment=%s%%" % offset_inc])
832
833    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
834        numa_stats = {}
835        allowed_cpus = []
836        for nvme in fio_filenames_list:
837            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
838            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
839
840        # Use the most common NUMA node for this chunk to allocate memory and CPUs
841        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
842
843        # Check if we have enough free CPUs to pop from the list before assigning them
844        if len(self.available_cpus[section_local_numa]) < num_jobs:
845            if self.allow_cpu_sharing:
846                self.log.info("Regenerating available CPU list %s" % section_local_numa)
847                # Remove still available CPUs from the regenerated list. We don't want to
848                # regenerate it with duplicates.
849                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
850                self.available_cpus[section_local_numa].extend(cpus_regen)
851                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
852            else:
853                self.log.error("No more free CPU cores to use from allowed_cpus list!")
854                raise IndexError
855
856        for _ in range(num_jobs):
857            try:
858                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
859            except IndexError:
860                self.log.error("No more free CPU cores to use from allowed_cpus list!")
861                raise
862
863        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
864                          "numa_mem_policy=prefer:%s" % section_local_numa])
865
866    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
867                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
868                       offset=False, offset_inc=0):
869        fio_conf_template = """
870[global]
871ioengine={ioengine}
872{spdk_conf}
873thread=1
874group_reporting=1
875direct=1
876percentile_list=50:90:99:99.5:99.9:99.99:99.999
877
878norandommap=1
879rw={rw}
880rwmixread={rwmixread}
881bs={block_size}
882time_based=1
883ramp_time={ramp_time}
884runtime={run_time}
885rate_iops={rate_iops}
886"""
887
888        if self.cpus_allowed is not None:
889            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
890            cpus_num = 0
891            cpus = self.cpus_allowed.split(",")
892            for cpu in cpus:
893                if "-" in cpu:
894                    a, b = cpu.split("-")
895                    a = int(a)
896                    b = int(b)
897                    cpus_num += len(range(a, b))
898                else:
899                    cpus_num += 1
900            self.num_cores = cpus_num
901            threads = range(0, self.num_cores)
902        elif hasattr(self, 'num_cores'):
903            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
904            threads = range(0, int(self.num_cores))
905        else:
906            self.num_cores = len(self.subsystem_info_list)
907            threads = range(0, len(self.subsystem_info_list))
908
909        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
910                                                      offset, offset_inc)
911
912        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
913                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
914                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
915
916        # TODO: hipri disabled for now, as it causes fio errors:
917        # io_u error on file /dev/nvme2n1: Operation not supported
918        # See comment in KernelInitiator class, init_connect() function
919        if "io_uring" in self.ioengine:
920            fio_config = fio_config + """
921fixedbufs=1
922registerfiles=1
923#hipri=1
924"""
925        if num_jobs:
926            fio_config = fio_config + "numjobs=%s \n" % num_jobs
927        if self.cpus_allowed is not None:
928            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
929            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
930        fio_config = fio_config + filename_section
931
932        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
933        if hasattr(self, "num_cores"):
934            fio_config_filename += "_%sCPU" % self.num_cores
935        fio_config_filename += ".fio"
936
937        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
938        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
939        self.log.info("Created FIO Config:")
940        self.log.info(fio_config)
941
942        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
943
944    def set_cpu_frequency(self):
945        if self.cpu_frequency is not None:
946            try:
947                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
948                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
949                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
950            except Exception:
951                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
952                sys.exit(1)
953        else:
954            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
955
956    def run_fio(self, fio_config_file, run_num=1):
957        job_name, _ = os.path.splitext(fio_config_file)
958        self.log.info("Starting FIO run for job: %s" % job_name)
959        self.log.info("Using FIO: %s" % self.fio_bin)
960
961        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
962        try:
963            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
964                                    "--output=%s" % output_filename, "--eta=never"], True)
965            self.log.info(output)
966            self.log.info("FIO run finished. Results in: %s" % output_filename)
967        except subprocess.CalledProcessError as e:
968            self.log.error("ERROR: Fio process failed!")
969            self.log.error(e.stdout)
970
971    def sys_config(self):
972        self.log.info("====Kernel release:====")
973        self.log.info(self.exec_cmd(["uname", "-r"]))
974        self.log.info("====Kernel command line:====")
975        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
976        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
977        self.log.info("====sysctl conf:====")
978        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
979        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
980        self.log.info("====Cpu power info:====")
981        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
982
983
984class KernelTarget(Target):
985    def __init__(self, name, general_config, target_config):
986        super().__init__(name, general_config, target_config)
987        # Defaults
988        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
989
990    def load_drivers(self):
991        self.log.info("Loading drivers")
992        super().load_drivers()
993        if self.null_block:
994            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
995
996    def configure_adq(self):
997        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
998
999    def adq_configure_tc(self):
1000        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1001
1002    def adq_set_busy_read(self, busy_read_val):
1003        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1004        return {"net.core.busy_read": 0}
1005
1006    def stop(self):
1007        self.nvmet_command(self.nvmet_bin, "clear")
1008        self.restore_settings()
1009
1010    def get_nvme_device_bdf(self, nvme_dev_path):
1011        nvme_name = os.path.basename(nvme_dev_path)
1012        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1013
1014    def get_nvme_devices(self):
1015        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1016        nvme_list = []
1017        for dev in dev_list:
1018            if "nvme" not in dev:
1019                continue
1020            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1021                continue
1022            if len(self.nvme_allowlist) == 0:
1023                nvme_list.append(dev)
1024                continue
1025            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1026                nvme_list.append(dev)
1027        return nvme_list
1028
1029    def nvmet_command(self, nvmet_bin, command):
1030        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1031
1032    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1033
1034        nvmet_cfg = {
1035            "ports": [],
1036            "hosts": [],
1037            "subsystems": [],
1038        }
1039
1040        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1041            port = str(4420 + bdev_num)
1042            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1043            serial = "SPDK00%s" % bdev_num
1044            bdev_name = nvme_list[bdev_num]
1045
1046            nvmet_cfg["subsystems"].append({
1047                "allowed_hosts": [],
1048                "attr": {
1049                    "allow_any_host": "1",
1050                    "serial": serial,
1051                    "version": "1.3"
1052                },
1053                "namespaces": [
1054                    {
1055                        "device": {
1056                            "path": bdev_name,
1057                            "uuid": "%s" % uuid.uuid4()
1058                        },
1059                        "enable": 1,
1060                        "nsid": port
1061                    }
1062                ],
1063                "nqn": nqn
1064            })
1065
1066            nvmet_cfg["ports"].append({
1067                "addr": {
1068                    "adrfam": "ipv4",
1069                    "traddr": ip,
1070                    "trsvcid": port,
1071                    "trtype": self.transport
1072                },
1073                "portid": bdev_num,
1074                "referrals": [],
1075                "subsystems": [nqn]
1076            })
1077
1078            self.subsystem_info_list.append((port, nqn, ip))
1079        self.subsys_no = len(self.subsystem_info_list)
1080
1081        with open("kernel.conf", "w") as fh:
1082            fh.write(json.dumps(nvmet_cfg, indent=2))
1083
1084    def tgt_start(self):
1085        self.log.info("Configuring kernel NVMeOF Target")
1086
1087        if self.null_block:
1088            self.log.info("Configuring with null block device.")
1089            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1090        else:
1091            self.log.info("Configuring with NVMe drives.")
1092            nvme_list = self.get_nvme_devices()
1093
1094        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1095        self.subsys_no = len(nvme_list)
1096
1097        self.nvmet_command(self.nvmet_bin, "clear")
1098        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1099
1100        if self.enable_adq:
1101            self.adq_configure_tc()
1102
1103        self.log.info("Done configuring kernel NVMeOF Target")
1104
1105
1106class SPDKTarget(Target):
1107    def __init__(self, name, general_config, target_config):
1108        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1109        # Method setting IRQ affinity is run as part of parent classes init,
1110        # so we need to have self.core_mask set before changing IRQ affinity.
1111        self.core_mask = target_config["core_mask"]
1112        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1113
1114        super().__init__(name, general_config, target_config)
1115
1116        # Format: property, default value
1117        config_fields = [
1118            ConfigField(name='dif_insert_strip', default=False),
1119            ConfigField(name='null_block_dif_type', default=0),
1120            ConfigField(name='num_shared_buffers', default=4096),
1121            ConfigField(name='max_queue_depth', default=128),
1122            ConfigField(name='bpf_scripts', default=[]),
1123            ConfigField(name='scheduler_core_limit', default=None),
1124            ConfigField(name='dsa_settings', default=False),
1125            ConfigField(name='iobuf_small_pool_count', default=32767),
1126            ConfigField(name='iobuf_large_pool_count', default=16383),
1127            ConfigField(name='num_cqe', default=4096),
1128            ConfigField(name='sock_impl', default='posix')
1129        ]
1130
1131        self.read_config(config_fields, target_config)
1132
1133        self.bpf_proc = None
1134        self.enable_dsa = False
1135
1136        self.log.info("====DSA settings:====")
1137        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1138
1139    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1140        if mode not in ["default", "bynode", "cpulist",
1141                        "shared", "split", "split-bynode"]:
1142            self.log.error("%s irq affinity setting not supported" % mode)
1143            raise Exception
1144
1145        # Create core list from SPDK's mask and change it to string.
1146        # This is the type configure_irq_affinity expects for cpulist parameter.
1147        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1148        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1149        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1150
1151        if mode == "shared":
1152            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1153        elif mode == "split":
1154            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1155        elif mode == "split-bynode":
1156            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1157        else:
1158            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1159
1160    def adq_set_busy_read(self, busy_read_val):
1161        return {"net.core.busy_read": busy_read_val}
1162
1163    def get_nvme_devices_count(self):
1164        return len(self.get_nvme_devices())
1165
1166    def get_nvme_devices(self):
1167        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1168        bdev_bdfs = []
1169        for bdev in bdev_subsys_json_obj["config"]:
1170            bdev_traddr = bdev["params"]["traddr"]
1171            if bdev_traddr in self.nvme_blocklist:
1172                continue
1173            if len(self.nvme_allowlist) == 0:
1174                bdev_bdfs.append(bdev_traddr)
1175            if bdev_traddr in self.nvme_allowlist:
1176                bdev_bdfs.append(bdev_traddr)
1177        return bdev_bdfs
1178
1179    def spdk_tgt_configure(self):
1180        self.log.info("Configuring SPDK NVMeOF target via RPC")
1181
1182        # Create transport layer
1183        nvmf_transport_params = {
1184            "client": self.client,
1185            "trtype": self.transport,
1186            "num_shared_buffers": self.num_shared_buffers,
1187            "max_queue_depth": self.max_queue_depth,
1188            "dif_insert_or_strip": self.dif_insert_strip,
1189            "sock_priority": self.adq_priority,
1190            "num_cqe": self.num_cqe
1191        }
1192
1193        if self.enable_adq:
1194            nvmf_transport_params["acceptor_poll_rate"] = 10000
1195
1196        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1197        self.log.info("SPDK NVMeOF transport layer:")
1198        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1199
1200        if self.null_block:
1201            self.spdk_tgt_add_nullblock(self.null_block)
1202            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1203        else:
1204            self.spdk_tgt_add_nvme_conf()
1205            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1206
1207        if self.enable_adq:
1208            self.adq_configure_tc()
1209
1210        self.log.info("Done configuring SPDK NVMeOF Target")
1211
1212    def spdk_tgt_add_nullblock(self, null_block_count):
1213        md_size = 0
1214        block_size = 4096
1215        if self.null_block_dif_type != 0:
1216            md_size = 128
1217
1218        self.log.info("Adding null block bdevices to config via RPC")
1219        for i in range(null_block_count):
1220            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1221            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1222                                      dif_type=self.null_block_dif_type, md_size=md_size)
1223        self.log.info("SPDK Bdevs configuration:")
1224        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1225
1226    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1227        self.log.info("Adding NVMe bdevs to config via RPC")
1228
1229        bdfs = self.get_nvme_devices()
1230        bdfs = [b.replace(":", ".") for b in bdfs]
1231
1232        if req_num_disks:
1233            if req_num_disks > len(bdfs):
1234                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1235                sys.exit(1)
1236            else:
1237                bdfs = bdfs[0:req_num_disks]
1238
1239        for i, bdf in enumerate(bdfs):
1240            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1241
1242        self.log.info("SPDK Bdevs configuration:")
1243        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1244
1245    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1246        self.log.info("Adding subsystems to config")
1247        if not req_num_disks:
1248            req_num_disks = self.get_nvme_devices_count()
1249
1250        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1251            port = str(4420 + bdev_num)
1252            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1253            serial = "SPDK00%s" % bdev_num
1254            bdev_name = "Nvme%sn1" % bdev_num
1255
1256            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1257                                           allow_any_host=True, max_namespaces=8)
1258            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1259            for nqn_name in [nqn, "discovery"]:
1260                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1261                                                     nqn=nqn_name,
1262                                                     trtype=self.transport,
1263                                                     traddr=ip,
1264                                                     trsvcid=port,
1265                                                     adrfam="ipv4")
1266            self.subsystem_info_list.append((port, nqn, ip))
1267        self.subsys_no = len(self.subsystem_info_list)
1268
1269        self.log.info("SPDK NVMeOF subsystem configuration:")
1270        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1271
1272    def bpf_start(self):
1273        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1274        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1275        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1276        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1277
1278        with open(self.pid, "r") as fh:
1279            nvmf_pid = str(fh.readline())
1280
1281        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1282        self.log.info(cmd)
1283        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1284
1285    def tgt_start(self):
1286        if self.null_block:
1287            self.subsys_no = 1
1288        else:
1289            self.subsys_no = self.get_nvme_devices_count()
1290        self.log.info("Starting SPDK NVMeOF Target process")
1291        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1292        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1293        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1294
1295        with open(self.pid, "w") as fh:
1296            fh.write(str(proc.pid))
1297        self.nvmf_proc = proc
1298        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1299        self.log.info("Waiting for spdk to initialize...")
1300        while True:
1301            if os.path.exists("/var/tmp/spdk.sock"):
1302                break
1303            time.sleep(1)
1304        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1305
1306        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1307        rpc.iobuf.iobuf_set_options(self.client,
1308                                    small_pool_count=self.iobuf_small_pool_count,
1309                                    large_pool_count=self.iobuf_large_pool_count,
1310                                    small_bufsize=None,
1311                                    large_bufsize=None)
1312
1313        if self.enable_zcopy:
1314            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1315                                           enable_zerocopy_send_server=True)
1316            self.log.info("Target socket options:")
1317            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1318
1319        if self.enable_adq:
1320            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1321            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1322                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1323
1324        if self.enable_dsa:
1325            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1326            self.log.info("Target DSA accel module enabled")
1327
1328        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1329        rpc.framework_start_init(self.client)
1330
1331        if self.bpf_scripts:
1332            self.bpf_start()
1333
1334        self.spdk_tgt_configure()
1335
1336    def stop(self):
1337        if self.bpf_proc:
1338            self.log.info("Stopping BPF Trace script")
1339            self.bpf_proc.terminate()
1340            self.bpf_proc.wait()
1341
1342        if hasattr(self, "nvmf_proc"):
1343            try:
1344                self.nvmf_proc.terminate()
1345                self.nvmf_proc.wait(timeout=30)
1346            except Exception as e:
1347                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1348                self.log.info(e)
1349                self.nvmf_proc.kill()
1350                self.nvmf_proc.communicate()
1351                # Try to clean up RPC socket files if they were not removed
1352                # because of using 'kill'
1353                try:
1354                    os.remove("/var/tmp/spdk.sock")
1355                    os.remove("/var/tmp/spdk.sock.lock")
1356                except FileNotFoundError:
1357                    pass
1358        self.restore_settings()
1359
1360
1361class KernelInitiator(Initiator):
1362    def __init__(self, name, general_config, initiator_config):
1363        super().__init__(name, general_config, initiator_config)
1364
1365        # Defaults
1366        self.extra_params = initiator_config.get('extra_params', '')
1367
1368        self.ioengine = "libaio"
1369        self.spdk_conf = ""
1370
1371        if "num_cores" in initiator_config:
1372            self.num_cores = initiator_config["num_cores"]
1373
1374        if "kernel_engine" in initiator_config:
1375            self.ioengine = initiator_config["kernel_engine"]
1376            if "io_uring" in self.ioengine:
1377                self.extra_params += ' --nr-poll-queues=8'
1378
1379    def configure_adq(self):
1380        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1381
1382    def adq_configure_tc(self):
1383        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1384
1385    def adq_set_busy_read(self, busy_read_val):
1386        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1387        return {"net.core.busy_read": 0}
1388
1389    def get_connected_nvme_list(self):
1390        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1391        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1392                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1393        return nvme_list
1394
1395    def init_connect(self):
1396        self.log.info("Below connection attempts may result in error messages, this is expected!")
1397        for subsystem in self.subsystem_info_list:
1398            self.log.info("Trying to connect %s %s %s" % subsystem)
1399            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1400                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1401            time.sleep(2)
1402
1403        if "io_uring" in self.ioengine:
1404            self.log.info("Setting block layer settings for io_uring.")
1405
1406            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1407            #       apparently it's not possible for connected subsystems.
1408            #       Results in "error: Invalid argument"
1409            block_sysfs_settings = {
1410                "iostats": "0",
1411                "rq_affinity": "0",
1412                "nomerges": "2"
1413            }
1414
1415            for disk in self.get_connected_nvme_list():
1416                sysfs = os.path.join("/sys/block", disk, "queue")
1417                for k, v in block_sysfs_settings.items():
1418                    sysfs_opt_path = os.path.join(sysfs, k)
1419                    try:
1420                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1421                    except CalledProcessError as e:
1422                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1423                    finally:
1424                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1425                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1426
1427    def init_disconnect(self):
1428        for subsystem in self.subsystem_info_list:
1429            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1430            time.sleep(1)
1431
1432    def get_nvme_subsystem_numa(self, dev_name):
1433        # Remove two last characters to get controller name instead of subsystem name
1434        nvme_ctrl = os.path.basename(dev_name)[:-2]
1435        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1436                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1437        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1438
1439    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1440        self.available_cpus = self.get_numa_cpu_map()
1441        if len(threads) >= len(subsystems):
1442            threads = range(0, len(subsystems))
1443
1444        # Generate connected nvme devices names and sort them by used NIC numa node
1445        # to allow better grouping when splitting into fio sections.
1446        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1447        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1448        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1449
1450        filename_section = ""
1451        nvme_per_split = int(len(nvme_list) / len(threads))
1452        remainder = len(nvme_list) % len(threads)
1453        iterator = iter(nvme_list)
1454        result = []
1455        for i in range(len(threads)):
1456            result.append([])
1457            for _ in range(nvme_per_split):
1458                result[i].append(next(iterator))
1459                if remainder:
1460                    result[i].append(next(iterator))
1461                    remainder -= 1
1462        for i, r in enumerate(result):
1463            header = "[filename%s]" % i
1464            disks = "\n".join(["filename=%s" % x for x in r])
1465            job_section_qd = round((io_depth * len(r)) / num_jobs)
1466            if job_section_qd == 0:
1467                job_section_qd = 1
1468            iodepth = "iodepth=%s" % job_section_qd
1469
1470            offset_section = ""
1471            if offset:
1472                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1473
1474            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1475
1476            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1477
1478        return filename_section
1479
1480
1481class SPDKInitiator(Initiator):
1482    def __init__(self, name, general_config, initiator_config):
1483        super().__init__(name, general_config, initiator_config)
1484
1485        if self.skip_spdk_install is False:
1486            self.install_spdk()
1487
1488        # Optional fields
1489        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1490        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1491        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1492        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1493
1494        if "num_cores" in initiator_config:
1495            self.num_cores = initiator_config["num_cores"]
1496
1497        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1498        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1499
1500    def adq_set_busy_read(self, busy_read_val):
1501        return {"net.core.busy_read": busy_read_val}
1502
1503    def install_spdk(self):
1504        self.log.info("Using fio binary %s" % self.fio_bin)
1505        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1506        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1507        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1508                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1509                       "--enable-lto", "--disable-unit-tests"])
1510        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1511        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1512
1513        self.log.info("SPDK built")
1514        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1515
1516    def init_connect(self):
1517        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1518        # bdev plugin initiates connection just before starting IO traffic.
1519        # This is just to have a "init_connect" equivalent of the same function
1520        # from KernelInitiator class.
1521        # Just prepare bdev.conf JSON file for later use and consider it
1522        # "making a connection".
1523        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1524        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1525
1526    def init_disconnect(self):
1527        # SPDK Initiator does not need to explicity disconnect as this gets done
1528        # after fio bdev plugin finishes IO.
1529        return
1530
1531    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1532        spdk_cfg_section = {
1533            "subsystems": [
1534                {
1535                    "subsystem": "bdev",
1536                    "config": []
1537                },
1538                {
1539                    "subsystem": "iobuf",
1540                    "config": [
1541                        {
1542                            "method": "iobuf_set_options",
1543                            "params": {
1544                                "small_pool_count": self.small_pool_count,
1545                                "large_pool_count": self.large_pool_count
1546                            }
1547                        }
1548                    ]
1549                },
1550                {
1551                    "subsystem": "sock",
1552                    "config": [
1553                        {
1554                            "method": "sock_set_default_impl",
1555                            "params": {
1556                                "impl_name": self.sock_impl
1557                            }
1558                        }
1559                    ]
1560                }
1561            ]
1562        }
1563
1564        for i, subsys in enumerate(remote_subsystem_list):
1565            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1566            nvme_ctrl = {
1567                "method": "bdev_nvme_attach_controller",
1568                "params": {
1569                    "name": "Nvme{}".format(i),
1570                    "trtype": self.transport,
1571                    "traddr": sub_addr,
1572                    "trsvcid": sub_port,
1573                    "subnqn": sub_nqn,
1574                    "adrfam": "IPv4"
1575                }
1576            }
1577
1578            if self.enable_adq:
1579                nvme_ctrl["params"].update({"priority": "1"})
1580
1581            if self.enable_data_digest:
1582                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1583
1584            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1585
1586        return json.dumps(spdk_cfg_section, indent=2)
1587
1588    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1589        self.available_cpus = self.get_numa_cpu_map()
1590        filename_section = ""
1591        if len(threads) >= len(subsystems):
1592            threads = range(0, len(subsystems))
1593
1594        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1595        # to allow better grouping when splitting into fio sections.
1596        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1597        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1598        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1599
1600        nvme_per_split = int(len(subsystems) / len(threads))
1601        remainder = len(subsystems) % len(threads)
1602        iterator = iter(filenames)
1603        result = []
1604        for i in range(len(threads)):
1605            result.append([])
1606            for _ in range(nvme_per_split):
1607                result[i].append(next(iterator))
1608            if remainder:
1609                result[i].append(next(iterator))
1610                remainder -= 1
1611        for i, r in enumerate(result):
1612            header = "[filename%s]" % i
1613            disks = "\n".join(["filename=%s" % x for x in r])
1614            job_section_qd = round((io_depth * len(r)) / num_jobs)
1615            if job_section_qd == 0:
1616                job_section_qd = 1
1617            iodepth = "iodepth=%s" % job_section_qd
1618
1619            offset_section = ""
1620            if offset:
1621                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1622
1623            numa_opts = self.gen_fio_numa_section(r, num_jobs)
1624
1625            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1626
1627        return filename_section
1628
1629    def get_nvme_subsystem_numa(self, bdev_name):
1630        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1631        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1632
1633        # Remove two last characters to get controller name instead of subsystem name
1634        nvme_ctrl = bdev_name[:-2]
1635        for bdev in bdev_conf_json_obj:
1636            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1637                return self.get_route_nic_numa(bdev["params"]["traddr"])
1638        return None
1639
1640
1641if __name__ == "__main__":
1642    exit_code = 0
1643
1644    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1645    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1646
1647    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1648    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1649                        help='Configuration file.')
1650    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1651                        help='Results directory.')
1652    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1653                        help='CSV results filename.')
1654    parser.add_argument('-f', '--force', default=False, action='store_true',
1655                        dest='force', help="""Force script to continue and try to use all
1656                        available NVMe devices during test.
1657                        WARNING: Might result in data loss on used NVMe drives""")
1658
1659    args = parser.parse_args()
1660
1661    logging.basicConfig(level=logging.INFO,
1662                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1663
1664    logging.info("Using config file: %s" % args.config)
1665    with open(args.config, "r") as config:
1666        data = json.load(config)
1667
1668    initiators = []
1669    fio_cases = []
1670
1671    general_config = data["general"]
1672    target_config = data["target"]
1673    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1674
1675    if "null_block_devices" not in data["target"] and \
1676        (args.force is False and
1677            "allowlist" not in data["target"] and
1678            "blocklist" not in data["target"]):
1679        # TODO: Also check if allowlist or blocklist are not empty.
1680        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1681        You can choose to use all available NVMe drives on your system, which may potentially
1682        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1683        exit(1)
1684
1685    for k, v in data.items():
1686        if "target" in k:
1687            v.update({"results_dir": args.results})
1688            if data[k]["mode"] == "spdk":
1689                target_obj = SPDKTarget(k, data["general"], v)
1690            elif data[k]["mode"] == "kernel":
1691                target_obj = KernelTarget(k, data["general"], v)
1692        elif "initiator" in k:
1693            if data[k]["mode"] == "spdk":
1694                init_obj = SPDKInitiator(k, data["general"], v)
1695            elif data[k]["mode"] == "kernel":
1696                init_obj = KernelInitiator(k, data["general"], v)
1697            initiators.append(init_obj)
1698        elif "fio" in k:
1699            fio_workloads = itertools.product(data[k]["bs"],
1700                                              data[k]["qd"],
1701                                              data[k]["rw"])
1702
1703            fio_run_time = data[k]["run_time"]
1704            fio_ramp_time = data[k]["ramp_time"]
1705            fio_rw_mix_read = data[k]["rwmixread"]
1706            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1707            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1708
1709            fio_rate_iops = 0
1710            if "rate_iops" in data[k]:
1711                fio_rate_iops = data[k]["rate_iops"]
1712
1713            fio_offset = False
1714            if "offset" in data[k]:
1715                fio_offset = data[k]["offset"]
1716            fio_offset_inc = 0
1717            if "offset_inc" in data[k]:
1718                fio_offset_inc = data[k]["offset_inc"]
1719        else:
1720            continue
1721
1722    try:
1723        os.mkdir(args.results)
1724    except FileExistsError:
1725        pass
1726
1727    for i in initiators:
1728        target_obj.initiator_info.append(
1729            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1730        )
1731
1732    # TODO: This try block is definietly too large. Need to break this up into separate
1733    # logical blocks to reduce size.
1734    try:
1735        target_obj.tgt_start()
1736
1737        for i in initiators:
1738            i.match_subsystems(target_obj.subsystem_info_list)
1739            if i.enable_adq:
1740                i.adq_configure_tc()
1741
1742        # Poor mans threading
1743        # Run FIO tests
1744        for block_size, io_depth, rw in fio_workloads:
1745            configs = []
1746            for i in initiators:
1747                i.init_connect()
1748                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1749                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1750                                       fio_offset, fio_offset_inc)
1751                configs.append(cfg)
1752
1753            for run_no in range(1, fio_run_num+1):
1754                threads = []
1755                power_daemon = None
1756                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1757
1758                for i, cfg in zip(initiators, configs):
1759                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1760                    threads.append(t)
1761                if target_obj.enable_sar:
1762                    sar_file_prefix = measurements_prefix + "_sar"
1763                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1764                    threads.append(t)
1765
1766                if target_obj.enable_pcm:
1767                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1768                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1769                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1770                    threads.append(pcm_cpu_t)
1771
1772                if target_obj.enable_bw:
1773                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1774                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1775                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1776                    threads.append(t)
1777
1778                if target_obj.enable_dpdk_memory:
1779                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1780                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1781                    threads.append(t)
1782
1783                if target_obj.enable_pm:
1784                    power_daemon = threading.Thread(target=target_obj.measure_power,
1785                                                    args=(args.results, measurements_prefix, script_full_dir,
1786                                                          fio_ramp_time, fio_run_time))
1787                    threads.append(power_daemon)
1788
1789                for t in threads:
1790                    t.start()
1791                for t in threads:
1792                    t.join()
1793
1794            for i in initiators:
1795                i.init_disconnect()
1796                i.copy_result_files(args.results)
1797        try:
1798            parse_results(args.results, args.csv_filename)
1799        except Exception as err:
1800            exit_code = 1
1801            logging.error("There was an error with parsing the results")
1802            logging.error(err)
1803    finally:
1804        for i in initiators:
1805            try:
1806                i.stop()
1807            except Exception as err:
1808                pass
1809        target_obj.stop()
1810        sys.exit(exit_code)
1811