xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision b02581a89058ebaebe03bd0e16e3b58adfe406c1)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7# Note: we use setattr
8# pylint: disable=no-member
9
10import os
11import re
12import sys
13import argparse
14import json
15import logging
16import zipfile
17import threading
18import subprocess
19import itertools
20import configparser
21import time
22import uuid
23
24import paramiko
25import pandas as pd
26from common import *
27from subprocess import CalledProcessError
28from abc import abstractmethod, ABC
29from dataclasses import dataclass
30from typing import Any
31
32sys.path.append(os.path.dirname(__file__) + '/../../../python')
33
34import spdk.rpc as rpc  # noqa
35import spdk.rpc.client as rpc_client  # noqa
36
37
38@dataclass
39class ConfigField:
40    name: str = None
41    default: Any = None
42    required: bool = False
43
44
45class Server(ABC):
46    def __init__(self, name, general_config, server_config):
47        self.name = name
48
49        self.log = logging.getLogger(self.name)
50
51        config_fields = [
52            ConfigField(name='username', required=True),
53            ConfigField(name='password', required=True),
54            ConfigField(name='transport', required=True),
55            ConfigField(name='skip_spdk_install', default=False),
56            ConfigField(name='irdma_roce_enable', default=False),
57            ConfigField(name='pause_frames', default=None)
58        ]
59
60        self.read_config(config_fields, general_config)
61        self.transport = self.transport.lower()
62
63        config_fields = [
64            ConfigField(name='nic_ips', required=True),
65            ConfigField(name='mode', required=True),
66            ConfigField(name='irq_scripts_dir', default='/usr/src/local/mlnx-tools/ofed_scripts'),
67            ConfigField(name='enable_arfs', default=False),
68            ConfigField(name='tuned_profile', default='')
69        ]
70        self.read_config(config_fields, server_config)
71
72        self.local_nic_info = []
73        self._nics_json_obj = {}
74        self.svc_restore_dict = {}
75        self.sysctl_restore_dict = {}
76        self.tuned_restore_dict = {}
77        self.governor_restore = ""
78
79        self.enable_adq = server_config.get('adq_enable', False)
80        self.adq_priority = 1 if self.enable_adq else None
81        self.irq_settings = {'mode': 'default', **server_config.get('irq_settings', {})}
82
83        if not re.match(r'^[A-Za-z0-9\-]+$', name):
84            self.log.info("Please use a name which contains only letters, numbers or dashes")
85            sys.exit(1)
86
87    def read_config(self, config_fields, config):
88        for config_field in config_fields:
89            value = config.get(config_field.name, config_field.default)
90            if config_field.required is True and value is None:
91                raise ValueError('%s config key is required' % config_field.name)
92
93            setattr(self, config_field.name, value)
94
95    @staticmethod
96    def get_uncommented_lines(lines):
97        return [line for line in lines if line and not line.startswith('#')]
98
99    def get_nic_name_by_ip(self, ip):
100        if not self._nics_json_obj:
101            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
102            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
103        for nic in self._nics_json_obj:
104            for addr in nic["addr_info"]:
105                if ip in addr["local"]:
106                    return nic["ifname"]
107
108    @abstractmethod
109    def set_local_nic_info_helper(self):
110        pass
111
112    def set_local_nic_info(self, pci_info):
113        def extract_network_elements(json_obj):
114            nic_list = []
115            if isinstance(json_obj, list):
116                for x in json_obj:
117                    nic_list.extend(extract_network_elements(x))
118            elif isinstance(json_obj, dict):
119                if "children" in json_obj:
120                    nic_list.extend(extract_network_elements(json_obj["children"]))
121                if "class" in json_obj.keys() and "network" in json_obj["class"]:
122                    nic_list.append(json_obj)
123            return nic_list
124
125        self.local_nic_info = extract_network_elements(pci_info)
126
127    def get_nic_numa_node(self, nic_name):
128        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
129
130    def get_numa_cpu_map(self):
131        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
132        numa_cpu_json_map = {}
133
134        for cpu in numa_cpu_json_obj["cpus"]:
135            cpu_num = int(cpu["cpu"])
136            numa_node = int(cpu["node"])
137            numa_cpu_json_map.setdefault(numa_node, [])
138            numa_cpu_json_map[numa_node].append(cpu_num)
139
140        return numa_cpu_json_map
141
142    # pylint: disable=R0201
143    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
144        return ""
145
146    def configure_system(self):
147        self.load_drivers()
148        self.configure_services()
149        self.configure_sysctl()
150        self.configure_arfs()
151        self.configure_tuned()
152        self.configure_cpu_governor()
153        self.configure_irq_affinity(**self.irq_settings)
154        self.configure_pause_frames()
155
156    RDMA_PROTOCOL_IWARP = 0
157    RDMA_PROTOCOL_ROCE = 1
158    RDMA_PROTOCOL_UNKNOWN = -1
159
160    def check_rdma_protocol(self):
161        try:
162            roce_ena = self.exec_cmd(["cat", "/sys/module/irdma/parameters/roce_ena"])
163            roce_ena = roce_ena.strip()
164            if roce_ena == "0":
165                return self.RDMA_PROTOCOL_IWARP
166            else:
167                return self.RDMA_PROTOCOL_ROCE
168        except CalledProcessError as e:
169            self.log.error("ERROR: failed to check RDMA protocol!")
170            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
171            return self.RDMA_PROTOCOL_UNKNOWN
172
173    def load_drivers(self):
174        self.log.info("Loading drivers")
175        self.exec_cmd(["sudo", "modprobe", "-a",
176                       "nvme-%s" % self.transport,
177                       "nvmet-%s" % self.transport])
178        current_mode = self.check_rdma_protocol()
179        if current_mode == self.RDMA_PROTOCOL_UNKNOWN:
180            self.log.error("ERROR: failed to check RDMA protocol mode")
181            return
182        if self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_IWARP:
183            self.reload_driver("irdma", "roce_ena=1")
184            self.log.info("Loaded irdma driver with RoCE enabled")
185        elif self.irdma_roce_enable and current_mode == self.RDMA_PROTOCOL_ROCE:
186            self.log.info("Leaving irdma driver with RoCE enabled")
187        else:
188            self.reload_driver("irdma", "roce_ena=0")
189            self.log.info("Loaded irdma driver with iWARP enabled")
190
191    def set_pause_frames(self, rx_state, tx_state):
192        for nic_ip in self.nic_ips:
193            nic_name = self.get_nic_name_by_ip(nic_ip)
194            self.exec_cmd(["sudo", "ethtool", "-A", nic_name, "rx", rx_state, "tx", tx_state])
195
196    def configure_pause_frames(self):
197        if self.pause_frames is None:
198            self.log.info("Keeping NIC's default pause frames setting")
199            return
200        elif not self.pause_frames:
201            self.log.info("Turning off pause frames")
202            self.set_pause_frames("off", "off")
203        else:
204            self.log.info("Turning on pause frames")
205            self.set_pause_frames("on", "on")
206
207    def configure_arfs(self):
208        rps_flow_cnt = 512
209        if not self.enable_arfs:
210            rps_flow_cnt = 0
211
212        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
213        for nic_name in nic_names:
214            self.exec_cmd(["sudo", "ethtool", "-K", nic_name, "ntuple", "on"])
215            self.log.info(f"Setting rps_flow_cnt={rps_flow_cnt} for {nic_name}")
216            queue_files = self.exec_cmd(["ls", f"/sys/class/net/{nic_name}/queues/"]).strip().split("\n")
217            queue_files = filter(lambda x: x.startswith("rx-"), queue_files)
218
219            for qf in queue_files:
220                self.exec_cmd(["sudo", "bash", "-c", f"echo {rps_flow_cnt} > /sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"])
221                set_value = self.exec_cmd(["cat", f"/sys/class/net/{nic_name}/queues/{qf}/rps_flow_cnt"]).strip()
222                self.log.info(f"Confirmed rps_flow_cnt set to {set_value} in /sys/class/net/{nic_name}/queues/{qf}")
223
224            self.log.info(f"Configuration of {nic_name} completed with confirmed rps_flow_cnt settings.")
225
226        self.log.info("ARFS configuration completed.")
227
228    def configure_adq(self):
229        self.adq_load_modules()
230        self.adq_configure_nic()
231
232    def adq_load_modules(self):
233        self.log.info("Modprobing ADQ-related Linux modules...")
234        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
235        for module in adq_module_deps:
236            try:
237                self.exec_cmd(["sudo", "modprobe", module])
238                self.log.info("%s loaded!" % module)
239            except CalledProcessError as e:
240                self.log.error("ERROR: failed to load module %s" % module)
241                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
242
243    def adq_configure_tc(self):
244        self.log.info("Configuring ADQ Traffic classes and filters...")
245
246        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
247        num_queues_tc1 = self.num_cores
248        port_param = "dst_port" if isinstance(self, Target) else "src_port"
249        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
250
251        for nic_ip in self.nic_ips:
252            nic_name = self.get_nic_name_by_ip(nic_ip)
253            nic_ports = [x[0] for x in self.subsystem_info_list if nic_ip in x[2]]
254
255            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
256                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
257                                "queues", "%s@0" % num_queues_tc0,
258                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
259                                "hw", "1", "mode", "channel"]
260            self.log.info(" ".join(tc_qdisc_map_cmd))
261            self.exec_cmd(tc_qdisc_map_cmd)
262
263            time.sleep(5)
264            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
265            self.log.info(" ".join(tc_qdisc_ingress_cmd))
266            self.exec_cmd(tc_qdisc_ingress_cmd)
267
268            nic_bdf = os.path.basename(self.exec_cmd(["readlink", "-f", "/sys/class/net/%s/device" % nic_name]).strip())
269            self.exec_cmd(["sudo", "devlink", "dev", "param", "set", "pci/%s" % nic_bdf,
270                           "name", "tc1_inline_fd", "value", "true", "cmode", "runtime"])
271
272            for port in nic_ports:
273                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
274                                 "protocol", "ip", "ingress", "prio", "1", "flower",
275                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
276                                 "skip_sw", "hw_tc", "1"]
277                self.log.info(" ".join(tc_filter_cmd))
278                self.exec_cmd(tc_filter_cmd)
279
280            # show tc configuration
281            self.log.info("Show tc configuration for %s NIC..." % nic_name)
282            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
283            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
284            self.log.info("%s" % tc_disk_out)
285            self.log.info("%s" % tc_filter_out)
286
287            # Ethtool coalesce settings must be applied after configuring traffic classes
288            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
289            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
290
291            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
292            xps_cmd = ["sudo", xps_script_path, nic_name]
293            self.log.info(xps_cmd)
294            self.exec_cmd(xps_cmd)
295
296    def reload_driver(self, driver, *modprobe_args):
297
298        try:
299            self.exec_cmd(["sudo", "rmmod", driver])
300            self.exec_cmd(["sudo", "modprobe", driver, *modprobe_args])
301        except CalledProcessError as e:
302            self.log.error("ERROR: failed to reload %s module!" % driver)
303            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
304
305    def adq_configure_nic(self):
306        self.log.info("Configuring NIC port settings for ADQ testing...")
307
308        # Reload the driver first, to make sure any previous settings are re-set.
309        self.reload_driver("ice")
310
311        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
312        for nic in nic_names:
313            self.log.info(nic)
314            try:
315                self.exec_cmd(["sudo", "ethtool", "-K", nic,
316                               "hw-tc-offload", "on"])  # Enable hardware TC offload
317                nic_bdf = self.exec_cmd(["bash", "-c", "source /sys/class/net/%s/device/uevent; echo $PCI_SLOT_NAME" % nic])
318                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
319                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-inspect-optimize", "off"])
320                # Following are suggested in ADQ Configuration Guide to be enabled for large block sizes,
321                # but can be used with 4k block sizes as well
322                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop", "on"])
323                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "channel-pkt-clean-bp-stop-cfg", "on"])
324            except subprocess.CalledProcessError as e:
325                self.log.error("ERROR: failed to configure NIC port using ethtool!")
326                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
327                self.log.info("Please update your NIC driver and firmware versions and try again.")
328            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
329            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
330
331    def configure_services(self):
332        self.log.info("Configuring active services...")
333        svc_config = configparser.ConfigParser(strict=False)
334
335        # Below list is valid only for RHEL / Fedora systems and might not
336        # contain valid names for other distributions.
337        svc_target_state = {
338            "firewalld": "inactive",
339            "irqbalance": "inactive",
340            "lldpad.service": "inactive",
341            "lldpad.socket": "inactive"
342        }
343
344        for service in svc_target_state:
345            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
346            out = "\n".join(["[%s]" % service, out])
347            svc_config.read_string(out)
348
349            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
350                continue
351
352            service_state = svc_config[service]["ActiveState"]
353            self.log.info("Current state of %s service is %s" % (service, service_state))
354            self.svc_restore_dict.update({service: service_state})
355            if service_state != "inactive":
356                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
357                self.exec_cmd(["sudo", "systemctl", "stop", service])
358
359    def configure_sysctl(self):
360        self.log.info("Tuning sysctl settings...")
361
362        busy_read = 0
363        busy_poll = 0
364        if self.enable_adq and self.mode == "spdk":
365            busy_read = 1
366            busy_poll = 1
367
368        sysctl_opts = {
369            "net.core.busy_poll": busy_poll,
370            "net.core.busy_read": busy_read,
371            "net.core.somaxconn": 4096,
372            "net.core.netdev_max_backlog": 8192,
373            "net.ipv4.tcp_max_syn_backlog": 16384,
374            "net.core.rmem_max": 268435456,
375            "net.core.wmem_max": 268435456,
376            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
377            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
378            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
379            "net.ipv4.route.flush": 1,
380            "vm.overcommit_memory": 1,
381            "net.core.rps_sock_flow_entries": 0
382        }
383
384        if self.enable_adq:
385            sysctl_opts.update(self.adq_set_busy_read(1))
386
387        if self.enable_arfs:
388            sysctl_opts.update({"net.core.rps_sock_flow_entries": 32768})
389
390        for opt, value in sysctl_opts.items():
391            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
392            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
393
394    def configure_tuned(self):
395        if not self.tuned_profile:
396            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
397            return
398
399        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
400        service = "tuned"
401        tuned_config = configparser.ConfigParser(strict=False)
402
403        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
404        out = "\n".join(["[%s]" % service, out])
405        tuned_config.read_string(out)
406        tuned_state = tuned_config[service]["ActiveState"]
407        self.svc_restore_dict.update({service: tuned_state})
408
409        if tuned_state != "inactive":
410            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
411            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
412
413            self.tuned_restore_dict = {
414                "profile": profile,
415                "mode": profile_mode
416            }
417
418        self.exec_cmd(["sudo", "systemctl", "start", service])
419        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
420        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
421
422    def configure_cpu_governor(self):
423        self.log.info("Setting CPU governor to performance...")
424
425        # This assumes that there is the same CPU scaling governor on each CPU
426        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
427        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
428
429    def get_core_list_from_mask(self, core_mask):
430        # Generate list of individual cores from hex core mask
431        # (e.g. '0xffff') or list containing:
432        # - individual cores (e.g. '1, 2, 3')
433        # - core ranges (e.g. '0-3')
434        # - mix of both (e.g. '0, 1-4, 9, 11-13')
435
436        core_list = []
437        if "0x" in core_mask:
438            core_mask_int = int(core_mask, 16)
439            for i in range(core_mask_int.bit_length()):
440                if (1 << i) & core_mask_int:
441                    core_list.append(i)
442            return core_list
443        else:
444            # Core list can be provided in .json config with square brackets
445            # remove them first
446            core_mask = core_mask.replace("[", "")
447            core_mask = core_mask.replace("]", "")
448
449            for i in core_mask.split(","):
450                if "-" in i:
451                    start, end = i.split("-")
452                    core_range = range(int(start), int(end) + 1)
453                    core_list.extend(core_range)
454                else:
455                    core_list.append(int(i))
456            return core_list
457
458    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
459        self.log.info("Setting NIC irq affinity for NICs. Using %s mode" % mode)
460
461        if mode not in ["default", "bynode", "cpulist"]:
462            raise ValueError("%s irq affinity setting not supported" % mode)
463
464        if mode == "cpulist" and not cpulist:
465            raise ValueError("%s irq affinity setting set, but no cpulist provided" % mode)
466
467        affinity_script = "set_irq_affinity.sh"
468        if "default" not in mode:
469            affinity_script = "set_irq_affinity_cpulist.sh"
470            system_cpu_map = self.get_numa_cpu_map()
471        irq_script_path = os.path.join(self.irq_scripts_dir, affinity_script)
472
473        def cpu_list_to_string(cpulist):
474            return ",".join(map(lambda x: str(x), cpulist))
475
476        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
477        for nic_name in nic_names:
478            irq_cmd = ["sudo", irq_script_path]
479
480            # Use only CPU cores matching NIC NUMA node.
481            # Remove any CPU cores if they're on exclusion list.
482            if mode == "bynode":
483                irq_cpus = system_cpu_map[self.get_nic_numa_node(nic_name)]
484                if cpulist and exclude_cpulist:
485                    disallowed_cpus = self.get_core_list_from_mask(cpulist)
486                    irq_cpus = list(set(irq_cpus) - set(disallowed_cpus))
487                    if not irq_cpus:
488                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
489                irq_cmd.append(cpu_list_to_string(irq_cpus))
490
491            if mode == "cpulist":
492                irq_cpus = self.get_core_list_from_mask(cpulist)
493                if exclude_cpulist:
494                    # Flatten system CPU list, we don't need NUMA awareness here
495                    system_cpu_list = sorted({x for v in system_cpu_map.values() for x in v})
496                    irq_cpus = list(set(system_cpu_list) - set(irq_cpus))
497                    if not irq_cpus:
498                        raise Exception("No CPUs left to process IRQs after excluding CPUs!")
499                irq_cmd.append(cpu_list_to_string(irq_cpus))
500
501            irq_cmd.append(nic_name)
502            self.log.info(irq_cmd)
503            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
504
505    def restore_services(self):
506        self.log.info("Restoring services...")
507        for service, state in self.svc_restore_dict.items():
508            cmd = "stop" if state == "inactive" else "start"
509            self.exec_cmd(["sudo", "systemctl", cmd, service])
510
511    def restore_sysctl(self):
512        self.log.info("Restoring sysctl settings...")
513        for opt, value in self.sysctl_restore_dict.items():
514            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
515
516    def restore_tuned(self):
517        self.log.info("Restoring tuned-adm settings...")
518
519        if not self.tuned_restore_dict:
520            return
521
522        if self.tuned_restore_dict["mode"] == "auto":
523            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
524            self.log.info("Reverted tuned-adm to auto_profile.")
525        else:
526            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
527            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
528
529    def restore_governor(self):
530        self.log.info("Restoring CPU governor setting...")
531        if self.governor_restore:
532            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
533            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
534
535    def restore_settings(self):
536        self.restore_governor()
537        self.restore_tuned()
538        self.restore_services()
539        self.restore_sysctl()
540        if self.enable_adq:
541            self.reload_driver("ice")
542
543    def check_for_throttling(self):
544        patterns = [
545            r"CPU\d+: Core temperature is above threshold, cpu clock is throttled",
546            r"mlx5_core \S+: poll_health:\d+:\(pid \d+\): device's health compromised",
547            r"mlx5_core \S+: print_health_info:\d+:\(pid \d+\): Health issue observed, High temperature",
548            r"mlx5_core \S+: temp_warn:\d+:\(pid \d+\): High temperature on sensors"
549        ]
550
551        issue_found = False
552        try:
553            output = self.exec_cmd(["dmesg"])
554
555            for line in output.split("\n"):
556                for pattern in patterns:
557                    if re.search(pattern, line):
558                        self.log.warning("Throttling or temperature issue found")
559                        issue_found = True
560            return 1 if issue_found else 0
561        except Exception as e:
562            self.log.error("ERROR: failed to execute dmesg command")
563            self.log.error(f"Error {e}")
564            return 1
565
566    def stop(self):
567        if self.check_for_throttling():
568            err_message = (f"Throttling or temperature issue found on {self.name} system! "
569                           "Check system logs!")
570            raise CpuThrottlingError(err_message)
571
572
573class Target(Server):
574    def __init__(self, name, general_config, target_config):
575        super().__init__(name, general_config, target_config)
576
577        # Defaults
578        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
579        self.subsystem_info_list = []
580        self.initiator_info = []
581
582        config_fields = [
583            ConfigField(name='mode', required=True),
584            ConfigField(name='results_dir', required=True),
585            ConfigField(name='enable_pm', default=True),
586            ConfigField(name='enable_sar', default=True),
587            ConfigField(name='enable_pcm', default=True),
588            ConfigField(name='enable_dpdk_memory', default=True)
589        ]
590        self.read_config(config_fields, target_config)
591
592        self.null_block = target_config.get('null_block_devices', 0)
593        self.scheduler_name = target_config.get('scheduler_settings', 'static')
594        self.enable_zcopy = target_config.get('zcopy_settings', False)
595        self.enable_bw = target_config.get('enable_bandwidth', True)
596        self.nvme_blocklist = target_config.get('blocklist', [])
597        self.nvme_allowlist = target_config.get('allowlist', [])
598
599        # Blocklist takes precedence, remove common elements from allowlist
600        self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
601
602        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
603        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
604
605        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
606        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
607        self.set_local_nic_info(self.set_local_nic_info_helper())
608
609        if self.skip_spdk_install is False:
610            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
611
612        self.configure_system()
613        if self.enable_adq:
614            self.configure_adq()
615            self.configure_irq_affinity()
616        self.sys_config()
617
618    def set_local_nic_info_helper(self):
619        return json.loads(self.exec_cmd(["lshw", "-json"]))
620
621    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
622        stderr_opt = None
623        if stderr_redirect:
624            stderr_opt = subprocess.STDOUT
625        if change_dir:
626            old_cwd = os.getcwd()
627            os.chdir(change_dir)
628            self.log.info("Changing directory to %s" % change_dir)
629
630        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
631
632        if change_dir:
633            os.chdir(old_cwd)
634            self.log.info("Changing directory to %s" % old_cwd)
635        return out
636
637    def zip_spdk_sources(self, spdk_dir, dest_file):
638        self.log.info("Zipping SPDK source directory")
639        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
640        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
641            for file in files:
642                fh.write(os.path.relpath(os.path.join(root, file)))
643        fh.close()
644        self.log.info("Done zipping")
645
646    @staticmethod
647    def _chunks(input_list, chunks_no):
648        div, rem = divmod(len(input_list), chunks_no)
649        for i in range(chunks_no):
650            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
651            yield input_list[si:si + (div + 1 if i < rem else div)]
652
653    def spread_bdevs(self, req_disks):
654        # Spread available block devices indexes:
655        # - evenly across available initiator systems
656        # - evenly across available NIC interfaces for
657        #   each initiator
658        # Not NUMA aware.
659        ip_bdev_map = []
660        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
661
662        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
663            self.initiator_info[i]["bdev_range"] = init_chunk
664            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
665            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
666                for c in nic_chunk:
667                    ip_bdev_map.append((ip, c))
668        return ip_bdev_map
669
670    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
671        cpu_number = os.cpu_count()
672        sar_idle_sum = 0
673        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
674        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
675
676        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
677        time.sleep(ramp_time)
678        self.log.info("Starting SAR measurements")
679
680        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
681        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
682            for line in out.split("\n"):
683                if "Average" in line:
684                    if "CPU" in line:
685                        self.log.info("Summary CPU utilization from SAR:")
686                        self.log.info(line)
687                    elif "all" in line:
688                        self.log.info(line)
689                    else:
690                        sar_idle_sum += float(line.split()[7])
691            fh.write(out)
692        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
693
694        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
695            f.write("%0.2f" % sar_cpu_usage)
696
697    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
698        time.sleep(ramp_time)
699        self.log.info("Starting power measurements")
700        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
701                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
702                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
703
704    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
705        time.sleep(ramp_time)
706        cmd = ["pcm", "1", "-i=%s" % run_time,
707               "-csv=%s/%s" % (results_dir, pcm_file_name)]
708        subprocess.run(cmd)
709        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
710        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
711        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
712        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
713        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
714
715    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
716        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
717        time.sleep(ramp_time)
718        self.log.info("INFO: starting network bandwidth measure")
719        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
720                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
721        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
722        #       Improve this so that additional file containing measurements average is generated too.
723        # TODO: Monitor only these interfaces which are currently used to run the workload.
724
725    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
726        self.log.info("INFO: waiting to generate DPDK memory usage")
727        time.sleep(ramp_time)
728        self.log.info("INFO: generating DPDK memory usage")
729        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
730        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
731
732    def sys_config(self):
733        self.log.info("====Kernel release:====")
734        self.log.info(os.uname().release)
735        self.log.info("====Kernel command line:====")
736        with open('/proc/cmdline') as f:
737            cmdline = f.readlines()
738            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
739        self.log.info("====sysctl conf:====")
740        with open('/etc/sysctl.conf') as f:
741            sysctl = f.readlines()
742            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
743        self.log.info("====Cpu power info:====")
744        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
745        self.log.info("====zcopy settings:====")
746        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
747        self.log.info("====Scheduler settings:====")
748        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
749
750
751class Initiator(Server):
752    def __init__(self, name, general_config, initiator_config):
753        super().__init__(name, general_config, initiator_config)
754
755        # required fields and defaults
756        config_fields = [
757            ConfigField(name='mode', required=True),
758            ConfigField(name='ip', required=True),
759            ConfigField(name='target_nic_ips', required=True),
760            ConfigField(name='cpus_allowed', default=None),
761            ConfigField(name='cpus_allowed_policy', default='shared'),
762            ConfigField(name='spdk_dir', default='/tmp/spdk'),
763            ConfigField(name='fio_bin', default='/usr/src/fio/fio'),
764            ConfigField(name='nvmecli_bin', default='nvme'),
765            ConfigField(name='cpu_frequency', default=None),
766            ConfigField(name='allow_cpu_sharing', default=True)
767        ]
768
769        self.read_config(config_fields, initiator_config)
770
771        if os.getenv('SPDK_WORKSPACE'):
772            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
773
774        self.subsystem_info_list = []
775
776        self.ssh_connection = paramiko.SSHClient()
777        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
778        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
779        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
780        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
781        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
782
783        if self.skip_spdk_install is False:
784            self.copy_spdk("/tmp/spdk.zip")
785
786        self.set_local_nic_info(self.set_local_nic_info_helper())
787        self.set_cpu_frequency()
788        self.configure_system()
789        if self.enable_adq:
790            self.configure_adq()
791        self.sys_config()
792
793    def set_local_nic_info_helper(self):
794        return json.loads(self.exec_cmd(["lshw", "-json"]))
795
796    def stop(self):
797        self.restore_settings()
798        try:
799            super().stop()
800        except CpuThrottlingError as err:
801            raise err
802        finally:
803            self.ssh_connection.close()
804
805    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
806        if change_dir:
807            cmd = ["cd", change_dir, ";", *cmd]
808
809        # In case one of the command elements contains whitespace and is not
810        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
811        # when sending to remote system.
812        for i, c in enumerate(cmd):
813            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
814                cmd[i] = '"%s"' % c
815        cmd = " ".join(cmd)
816
817        # Redirect stderr to stdout thanks using get_pty option if needed
818        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
819        out = stdout.read().decode(encoding="utf-8")
820
821        # Check the return code
822        rc = stdout.channel.recv_exit_status()
823        if rc:
824            raise CalledProcessError(int(rc), cmd, out)
825
826        return out
827
828    def put_file(self, local, remote_dest):
829        ftp = self.ssh_connection.open_sftp()
830        ftp.put(local, remote_dest)
831        ftp.close()
832
833    def get_file(self, remote, local_dest):
834        ftp = self.ssh_connection.open_sftp()
835        ftp.get(remote, local_dest)
836        ftp.close()
837
838    def copy_spdk(self, local_spdk_zip):
839        self.log.info("Copying SPDK sources to initiator %s" % self.name)
840        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
841        self.log.info("Copied sources zip from target")
842        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
843        self.log.info("Sources unpacked")
844
845    def copy_result_files(self, dest_dir):
846        self.log.info("Copying results")
847
848        if not os.path.exists(dest_dir):
849            os.mkdir(dest_dir)
850
851        # Get list of result files from initiator and copy them back to target
852        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
853
854        for file in file_list:
855            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
856                          os.path.join(dest_dir, file))
857        self.log.info("Done copying results")
858
859    def match_subsystems(self, target_subsystems):
860        subsystems = [subsystem for subsystem in target_subsystems if subsystem[2] in self.target_nic_ips]
861        if not subsystems:
862            raise Exception("No matching subsystems found on target side!")
863        subsystems.sort(key=lambda x: x[1])
864        self.log.info("Found matching subsystems on target side:")
865        for s in subsystems:
866            self.log.info(s)
867        self.subsystem_info_list = subsystems
868
869    @abstractmethod
870    def init_connect(self):
871        pass
872
873    @abstractmethod
874    def init_disconnect(self):
875        pass
876
877    @abstractmethod
878    def gen_fio_filename_conf(self, *args, **kwargs):
879        # Logic implemented in SPDKInitiator and KernelInitiator classes
880        pass
881
882    def get_route_nic_numa(self, remote_nvme_ip):
883        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
884        local_nvme_nic = local_nvme_nic[0]["dev"]
885        return self.get_nic_numa_node(local_nvme_nic)
886
887    @staticmethod
888    def gen_fio_offset_section(offset_inc, num_jobs):
889        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
890        return "\n".join(["size=%s%%" % offset_inc,
891                          "offset=0%",
892                          "offset_increment=%s%%" % offset_inc])
893
894    def gen_fio_numa_section(self, fio_filenames_list, num_jobs):
895        numa_stats = {}
896        allowed_cpus = []
897        for nvme in fio_filenames_list:
898            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
899            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
900
901        # Use the most common NUMA node for this chunk to allocate memory and CPUs
902        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
903
904        # Check if we have enough free CPUs to pop from the list before assigning them
905        if len(self.available_cpus[section_local_numa]) < num_jobs:
906            if self.allow_cpu_sharing:
907                self.log.info("Regenerating available CPU list %s" % section_local_numa)
908                # Remove still available CPUs from the regenerated list. We don't want to
909                # regenerate it with duplicates.
910                cpus_regen = set(self.get_numa_cpu_map()[section_local_numa]) - set(self.available_cpus[section_local_numa])
911                self.available_cpus[section_local_numa].extend(cpus_regen)
912                self.log.info(self.log.info(self.available_cpus[section_local_numa]))
913            else:
914                self.log.error("No more free CPU cores to use from allowed_cpus list!")
915                raise IndexError
916
917        for _ in range(num_jobs):
918            try:
919                allowed_cpus.append(str(self.available_cpus[section_local_numa].pop(0)))
920            except IndexError:
921                self.log.error("No more free CPU cores to use from allowed_cpus list!")
922                raise
923
924        return "\n".join(["cpus_allowed=%s" % ",".join(allowed_cpus),
925                          "numa_mem_policy=prefer:%s" % section_local_numa])
926
927    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
928                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
929                       offset=False, offset_inc=0, numa_align=True):
930        fio_conf_template = """
931[global]
932ioengine={ioengine}
933{spdk_conf}
934thread=1
935group_reporting=1
936direct=1
937percentile_list=50:90:99:99.5:99.9:99.99:99.999
938
939norandommap=1
940rw={rw}
941rwmixread={rwmixread}
942bs={block_size}
943time_based=1
944ramp_time={ramp_time}
945runtime={run_time}
946rate_iops={rate_iops}
947"""
948
949        if self.cpus_allowed is not None:
950            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
951            cpus_num = 0
952            cpus = self.cpus_allowed.split(",")
953            for cpu in cpus:
954                if "-" in cpu:
955                    a, b = cpu.split("-")
956                    a = int(a)
957                    b = int(b)
958                    cpus_num += len(range(a, b))
959                else:
960                    cpus_num += 1
961            self.num_cores = cpus_num
962            threads = range(0, self.num_cores)
963        elif hasattr(self, 'num_cores'):
964            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
965            threads = range(0, int(self.num_cores))
966        else:
967            self.num_cores = len(self.subsystem_info_list)
968            threads = range(0, len(self.subsystem_info_list))
969
970        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
971                                                      offset, offset_inc, numa_align)
972
973        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
974                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
975                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
976
977        # TODO: hipri disabled for now, as it causes fio errors:
978        # io_u error on file /dev/nvme2n1: Operation not supported
979        # See comment in KernelInitiator class, init_connect() function
980        if "io_uring" in self.ioengine:
981            fio_config = fio_config + """
982fixedbufs=1
983registerfiles=1
984#hipri=1
985"""
986        if num_jobs:
987            fio_config = fio_config + "numjobs=%s \n" % num_jobs
988        if self.cpus_allowed is not None:
989            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
990            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
991        fio_config = fio_config + filename_section
992
993        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
994        if hasattr(self, "num_cores"):
995            fio_config_filename += "_%sCPU" % self.num_cores
996        fio_config_filename += ".fio"
997
998        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
999        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
1000        self.log.info("Created FIO Config:")
1001        self.log.info(fio_config)
1002
1003        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
1004
1005    def set_cpu_frequency(self):
1006        if self.cpu_frequency is not None:
1007            try:
1008                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
1009                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
1010                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
1011            except Exception:
1012                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
1013                sys.exit(1)
1014        else:
1015            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
1016
1017    def run_fio(self, fio_config_file, run_num=1):
1018        job_name, _ = os.path.splitext(fio_config_file)
1019        self.log.info("Starting FIO run for job: %s" % job_name)
1020        self.log.info("Using FIO: %s" % self.fio_bin)
1021
1022        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
1023        try:
1024            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
1025                                    "--output=%s" % output_filename, "--eta=never"], True)
1026            self.log.info(output)
1027            self.log.info("FIO run finished. Results in: %s" % output_filename)
1028        except subprocess.CalledProcessError as e:
1029            self.log.error("ERROR: Fio process failed!")
1030            self.log.error(e.stdout)
1031
1032    def sys_config(self):
1033        self.log.info("====Kernel release:====")
1034        self.log.info(self.exec_cmd(["uname", "-r"]))
1035        self.log.info("====Kernel command line:====")
1036        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1037        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1038        self.log.info("====sysctl conf:====")
1039        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
1040        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1041        self.log.info("====Cpu power info:====")
1042        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
1043
1044
1045class KernelTarget(Target):
1046    def __init__(self, name, general_config, target_config):
1047        super().__init__(name, general_config, target_config)
1048        # Defaults
1049        self.nvmet_bin = target_config.get('nvmet_bin', 'nvmetcli')
1050
1051    def load_drivers(self):
1052        self.log.info("Loading drivers")
1053        super().load_drivers()
1054        if self.null_block:
1055            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
1056
1057    def configure_adq(self):
1058        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1059
1060    def adq_configure_tc(self):
1061        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1062
1063    def adq_set_busy_read(self, busy_read_val):
1064        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1065        return {"net.core.busy_read": 0}
1066
1067    def stop(self):
1068        self.nvmet_command(self.nvmet_bin, "clear")
1069        self.restore_settings()
1070        super().stop()
1071
1072    def get_nvme_device_bdf(self, nvme_dev_path):
1073        nvme_name = os.path.basename(nvme_dev_path)
1074        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
1075
1076    def get_nvme_devices(self):
1077        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
1078        nvme_list = []
1079        for dev in dev_list:
1080            if "nvme" not in dev:
1081                continue
1082            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
1083                continue
1084            if len(self.nvme_allowlist) == 0:
1085                nvme_list.append(dev)
1086                continue
1087            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
1088                nvme_list.append(dev)
1089        return nvme_list
1090
1091    def nvmet_command(self, nvmet_bin, command):
1092        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
1093
1094    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1095
1096        nvmet_cfg = {
1097            "ports": [],
1098            "hosts": [],
1099            "subsystems": [],
1100        }
1101
1102        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1103            port = str(4420 + bdev_num)
1104            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1105            serial = "SPDK00%s" % bdev_num
1106            bdev_name = nvme_list[bdev_num]
1107
1108            nvmet_cfg["subsystems"].append({
1109                "allowed_hosts": [],
1110                "attr": {
1111                    "allow_any_host": "1",
1112                    "serial": serial,
1113                    "version": "1.3"
1114                },
1115                "namespaces": [
1116                    {
1117                        "device": {
1118                            "path": bdev_name,
1119                            "uuid": "%s" % uuid.uuid4()
1120                        },
1121                        "enable": 1,
1122                        "nsid": port
1123                    }
1124                ],
1125                "nqn": nqn
1126            })
1127
1128            nvmet_cfg["ports"].append({
1129                "addr": {
1130                    "adrfam": "ipv4",
1131                    "traddr": ip,
1132                    "trsvcid": port,
1133                    "trtype": self.transport
1134                },
1135                "portid": bdev_num,
1136                "referrals": [],
1137                "subsystems": [nqn]
1138            })
1139
1140            self.subsystem_info_list.append((port, nqn, ip))
1141        self.subsys_no = len(self.subsystem_info_list)
1142
1143        with open("kernel.conf", "w") as fh:
1144            fh.write(json.dumps(nvmet_cfg, indent=2))
1145
1146    def tgt_start(self):
1147        self.log.info("Configuring kernel NVMeOF Target")
1148
1149        if self.null_block:
1150            self.log.info("Configuring with null block device.")
1151            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1152        else:
1153            self.log.info("Configuring with NVMe drives.")
1154            nvme_list = self.get_nvme_devices()
1155
1156        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1157        self.subsys_no = len(nvme_list)
1158
1159        self.nvmet_command(self.nvmet_bin, "clear")
1160        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
1161
1162        if self.enable_adq:
1163            self.adq_configure_tc()
1164
1165        self.log.info("Done configuring kernel NVMeOF Target")
1166
1167
1168class SPDKTarget(Target):
1169    def __init__(self, name, general_config, target_config):
1170        # IRQ affinity on SPDK Target side takes Target's core mask into consideration.
1171        # Method setting IRQ affinity is run as part of parent classes init,
1172        # so we need to have self.core_mask set before changing IRQ affinity.
1173        self.core_mask = target_config["core_mask"]
1174        self.num_cores = len(self.get_core_list_from_mask(self.core_mask))
1175
1176        super().__init__(name, general_config, target_config)
1177
1178        # Format: property, default value
1179        config_fields = [
1180            ConfigField(name='dif_insert_strip', default=False),
1181            ConfigField(name='null_block_dif_type', default=0),
1182            ConfigField(name='num_shared_buffers', default=4096),
1183            ConfigField(name='max_queue_depth', default=128),
1184            ConfigField(name='bpf_scripts', default=[]),
1185            ConfigField(name='scheduler_core_limit', default=None),
1186            ConfigField(name='dsa_settings', default=False),
1187            ConfigField(name='iobuf_small_pool_count', default=32767),
1188            ConfigField(name='iobuf_large_pool_count', default=16383),
1189            ConfigField(name='num_cqe', default=4096),
1190            ConfigField(name='sock_impl', default='posix')
1191        ]
1192
1193        self.read_config(config_fields, target_config)
1194
1195        self.bpf_proc = None
1196        self.enable_dsa = False
1197
1198        self.log.info("====DSA settings:====")
1199        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1200
1201    def configure_irq_affinity(self, mode="default", cpulist=None, exclude_cpulist=False):
1202        if mode not in ["default", "bynode", "cpulist",
1203                        "shared", "split", "split-bynode"]:
1204            self.log.error("%s irq affinity setting not supported" % mode)
1205            raise Exception
1206
1207        # Create core list from SPDK's mask and change it to string.
1208        # This is the type configure_irq_affinity expects for cpulist parameter.
1209        spdk_tgt_core_list = self.get_core_list_from_mask(self.core_mask)
1210        spdk_tgt_core_list = ",".join(map(lambda x: str(x), spdk_tgt_core_list))
1211        spdk_tgt_core_list = "[" + spdk_tgt_core_list + "]"
1212
1213        if mode == "shared":
1214            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list)
1215        elif mode == "split":
1216            super().configure_irq_affinity(mode="cpulist", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1217        elif mode == "split-bynode":
1218            super().configure_irq_affinity(mode="bynode", cpulist=spdk_tgt_core_list, exclude_cpulist=True)
1219        else:
1220            super().configure_irq_affinity(mode=mode, cpulist=cpulist, exclude_cpulist=exclude_cpulist)
1221
1222    def adq_set_busy_read(self, busy_read_val):
1223        return {"net.core.busy_read": busy_read_val}
1224
1225    def get_nvme_devices_count(self):
1226        return len(self.get_nvme_devices())
1227
1228    def get_nvme_devices(self):
1229        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1230        bdev_bdfs = []
1231        for bdev in bdev_subsys_json_obj["config"]:
1232            bdev_traddr = bdev["params"]["traddr"]
1233            if bdev_traddr in self.nvme_blocklist:
1234                continue
1235            if len(self.nvme_allowlist) == 0:
1236                bdev_bdfs.append(bdev_traddr)
1237            if bdev_traddr in self.nvme_allowlist:
1238                bdev_bdfs.append(bdev_traddr)
1239        return bdev_bdfs
1240
1241    def spdk_tgt_configure(self):
1242        self.log.info("Configuring SPDK NVMeOF target via RPC")
1243
1244        # Create transport layer
1245        nvmf_transport_params = {
1246            "client": self.client,
1247            "trtype": self.transport,
1248            "num_shared_buffers": self.num_shared_buffers,
1249            "max_queue_depth": self.max_queue_depth,
1250            "dif_insert_or_strip": self.dif_insert_strip,
1251            "sock_priority": self.adq_priority,
1252            "num_cqe": self.num_cqe
1253        }
1254
1255        if self.enable_adq:
1256            nvmf_transport_params["acceptor_poll_rate"] = 10000
1257
1258        rpc.nvmf.nvmf_create_transport(**nvmf_transport_params)
1259        self.log.info("SPDK NVMeOF transport layer:")
1260        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1261
1262        if self.null_block:
1263            self.spdk_tgt_add_nullblock(self.null_block)
1264            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1265        else:
1266            self.spdk_tgt_add_nvme_conf()
1267            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1268
1269        if self.enable_adq:
1270            self.adq_configure_tc()
1271
1272        self.log.info("Done configuring SPDK NVMeOF Target")
1273
1274    def spdk_tgt_add_nullblock(self, null_block_count):
1275        md_size = 0
1276        block_size = 4096
1277        if self.null_block_dif_type != 0:
1278            md_size = 128
1279
1280        self.log.info("Adding null block bdevices to config via RPC")
1281        for i in range(null_block_count):
1282            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1283            rpc.bdev.bdev_null_create(self.client, 102400, block_size, "Nvme{}n1".format(i),
1284                                      dif_type=self.null_block_dif_type, md_size=md_size)
1285        self.log.info("SPDK Bdevs configuration:")
1286        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1287
1288    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1289        self.log.info("Adding NVMe bdevs to config via RPC")
1290
1291        bdfs = self.get_nvme_devices()
1292        bdfs = [b.replace(":", ".") for b in bdfs]
1293
1294        if req_num_disks:
1295            if req_num_disks > len(bdfs):
1296                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1297                sys.exit(1)
1298            else:
1299                bdfs = bdfs[0:req_num_disks]
1300
1301        for i, bdf in enumerate(bdfs):
1302            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1303
1304        self.log.info("SPDK Bdevs configuration:")
1305        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1306
1307    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1308        self.log.info("Adding subsystems to config")
1309        if not req_num_disks:
1310            req_num_disks = self.get_nvme_devices_count()
1311
1312        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1313            port = str(4420 + bdev_num)
1314            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1315            serial = "SPDK00%s" % bdev_num
1316            bdev_name = "Nvme%sn1" % bdev_num
1317
1318            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1319                                           allow_any_host=True, max_namespaces=8)
1320            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1321            for nqn_name in [nqn, "discovery"]:
1322                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1323                                                     nqn=nqn_name,
1324                                                     trtype=self.transport,
1325                                                     traddr=ip,
1326                                                     trsvcid=port,
1327                                                     adrfam="ipv4")
1328            self.subsystem_info_list.append((port, nqn, ip))
1329        self.subsys_no = len(self.subsystem_info_list)
1330
1331        self.log.info("SPDK NVMeOF subsystem configuration:")
1332        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1333
1334    def bpf_start(self):
1335        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1336        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1337        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1338        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1339
1340        with open(self.pid, "r") as fh:
1341            nvmf_pid = str(fh.readline())
1342
1343        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1344        self.log.info(cmd)
1345        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1346
1347    def tgt_start(self):
1348        if self.null_block:
1349            self.subsys_no = 1
1350        else:
1351            self.subsys_no = self.get_nvme_devices_count()
1352        self.log.info("Starting SPDK NVMeOF Target process")
1353        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1354        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1355        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1356
1357        with open(self.pid, "w") as fh:
1358            fh.write(str(proc.pid))
1359        self.nvmf_proc = proc
1360        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1361        self.log.info("Waiting for spdk to initialize...")
1362        while True:
1363            if os.path.exists("/var/tmp/spdk.sock"):
1364                break
1365            time.sleep(1)
1366        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1367
1368        rpc.sock.sock_set_default_impl(self.client, impl_name=self.sock_impl)
1369        rpc.iobuf.iobuf_set_options(self.client,
1370                                    small_pool_count=self.iobuf_small_pool_count,
1371                                    large_pool_count=self.iobuf_large_pool_count,
1372                                    small_bufsize=None,
1373                                    large_bufsize=None)
1374
1375        if self.enable_zcopy:
1376            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl,
1377                                           enable_zerocopy_send_server=True)
1378            self.log.info("Target socket options:")
1379            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name=self.sock_impl))
1380
1381        if self.enable_adq:
1382            rpc.sock.sock_impl_set_options(self.client, impl_name=self.sock_impl, enable_placement_id=1)
1383            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1384                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1385
1386        if self.enable_dsa:
1387            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1388            self.log.info("Target DSA accel module enabled")
1389
1390        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1391        rpc.framework_start_init(self.client)
1392
1393        if self.bpf_scripts:
1394            self.bpf_start()
1395
1396        self.spdk_tgt_configure()
1397
1398    def stop(self):
1399        if self.bpf_proc:
1400            self.log.info("Stopping BPF Trace script")
1401            self.bpf_proc.terminate()
1402            self.bpf_proc.wait()
1403
1404        if hasattr(self, "nvmf_proc"):
1405            try:
1406                self.nvmf_proc.terminate()
1407                self.nvmf_proc.wait(timeout=30)
1408            except Exception as e:
1409                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1410                self.log.info(e)
1411                self.nvmf_proc.kill()
1412                self.nvmf_proc.communicate()
1413                # Try to clean up RPC socket files if they were not removed
1414                # because of using 'kill'
1415                try:
1416                    os.remove("/var/tmp/spdk.sock")
1417                    os.remove("/var/tmp/spdk.sock.lock")
1418                except FileNotFoundError:
1419                    pass
1420        self.restore_settings()
1421        super().stop()
1422
1423
1424class KernelInitiator(Initiator):
1425    def __init__(self, name, general_config, initiator_config):
1426        super().__init__(name, general_config, initiator_config)
1427
1428        # Defaults
1429        self.extra_params = initiator_config.get('extra_params', '')
1430
1431        self.ioengine = "libaio"
1432        self.spdk_conf = ""
1433
1434        if "num_cores" in initiator_config:
1435            self.num_cores = initiator_config["num_cores"]
1436
1437        if "kernel_engine" in initiator_config:
1438            self.ioengine = initiator_config["kernel_engine"]
1439            if "io_uring" in self.ioengine:
1440                self.extra_params += ' --nr-poll-queues=8'
1441
1442    def configure_adq(self):
1443        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1444
1445    def adq_configure_tc(self):
1446        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1447
1448    def adq_set_busy_read(self, busy_read_val):
1449        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1450        return {"net.core.busy_read": 0}
1451
1452    def get_connected_nvme_list(self):
1453        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1454        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1455                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1456        return nvme_list
1457
1458    def init_connect(self):
1459        self.log.info("Below connection attempts may result in error messages, this is expected!")
1460        for subsystem in self.subsystem_info_list:
1461            self.log.info("Trying to connect %s %s %s" % subsystem)
1462            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1463                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1464            time.sleep(2)
1465
1466        if "io_uring" in self.ioengine:
1467            self.log.info("Setting block layer settings for io_uring.")
1468
1469            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1470            #       apparently it's not possible for connected subsystems.
1471            #       Results in "error: Invalid argument"
1472            block_sysfs_settings = {
1473                "iostats": "0",
1474                "rq_affinity": "0",
1475                "nomerges": "2"
1476            }
1477
1478            for disk in self.get_connected_nvme_list():
1479                sysfs = os.path.join("/sys/block", disk, "queue")
1480                for k, v in block_sysfs_settings.items():
1481                    sysfs_opt_path = os.path.join(sysfs, k)
1482                    try:
1483                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1484                    except CalledProcessError as e:
1485                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1486                    finally:
1487                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1488                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1489
1490    def init_disconnect(self):
1491        for subsystem in self.subsystem_info_list:
1492            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1493            time.sleep(1)
1494
1495    def get_nvme_subsystem_numa(self, dev_name):
1496        # Remove two last characters to get controller name instead of subsystem name
1497        nvme_ctrl = os.path.basename(dev_name)[:-2]
1498        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1499                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1500        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1501
1502    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True):
1503        self.available_cpus = self.get_numa_cpu_map()
1504        if len(threads) >= len(subsystems):
1505            threads = range(0, len(subsystems))
1506
1507        # Generate connected nvme devices names and sort them by used NIC numa node
1508        # to allow better grouping when splitting into fio sections.
1509        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1510        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1511        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1512
1513        filename_section = ""
1514        nvme_per_split = int(len(nvme_list) / len(threads))
1515        remainder = len(nvme_list) % len(threads)
1516        iterator = iter(nvme_list)
1517        result = []
1518        for i in range(len(threads)):
1519            result.append([])
1520            for _ in range(nvme_per_split):
1521                result[i].append(next(iterator))
1522                if remainder:
1523                    result[i].append(next(iterator))
1524                    remainder -= 1
1525        for i, r in enumerate(result):
1526            header = "[filename%s]" % i
1527            disks = "\n".join(["filename=%s" % x for x in r])
1528            job_section_qd = round((io_depth * len(r)) / num_jobs)
1529            if job_section_qd == 0:
1530                job_section_qd = 1
1531            iodepth = "iodepth=%s" % job_section_qd
1532
1533            offset_section = ""
1534            if offset:
1535                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1536
1537            numa_opts = ""
1538            if numa_align:
1539                numa_opts = self.gen_fio_numa_section(r, num_jobs)
1540
1541            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1542
1543        return filename_section
1544
1545
1546class SPDKInitiator(Initiator):
1547    def __init__(self, name, general_config, initiator_config):
1548        super().__init__(name, general_config, initiator_config)
1549
1550        if self.skip_spdk_install is False:
1551            self.install_spdk()
1552
1553        # Optional fields
1554        self.enable_data_digest = initiator_config.get('enable_data_digest', False)
1555        self.small_pool_count = initiator_config.get('small_pool_count', 32768)
1556        self.large_pool_count = initiator_config.get('large_pool_count', 16384)
1557        self.sock_impl = initiator_config.get('sock_impl', 'posix')
1558
1559        if "num_cores" in initiator_config:
1560            self.num_cores = initiator_config["num_cores"]
1561
1562        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1563        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1564
1565    def adq_set_busy_read(self, busy_read_val):
1566        return {"net.core.busy_read": busy_read_val}
1567
1568    def install_spdk(self):
1569        self.log.info("Using fio binary %s" % self.fio_bin)
1570        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1571        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1572        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma",
1573                       "--with-fio=%s" % os.path.dirname(self.fio_bin),
1574                       "--enable-lto", "--disable-unit-tests"])
1575        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1576        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1577
1578        self.log.info("SPDK built")
1579        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1580
1581    def init_connect(self):
1582        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1583        # bdev plugin initiates connection just before starting IO traffic.
1584        # This is just to have a "init_connect" equivalent of the same function
1585        # from KernelInitiator class.
1586        # Just prepare bdev.conf JSON file for later use and consider it
1587        # "making a connection".
1588        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1589        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1590
1591    def init_disconnect(self):
1592        # SPDK Initiator does not need to explicity disconnect as this gets done
1593        # after fio bdev plugin finishes IO.
1594        return
1595
1596    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1597        spdk_cfg_section = {
1598            "subsystems": [
1599                {
1600                    "subsystem": "bdev",
1601                    "config": []
1602                },
1603                {
1604                    "subsystem": "iobuf",
1605                    "config": [
1606                        {
1607                            "method": "iobuf_set_options",
1608                            "params": {
1609                                "small_pool_count": self.small_pool_count,
1610                                "large_pool_count": self.large_pool_count
1611                            }
1612                        }
1613                    ]
1614                },
1615                {
1616                    "subsystem": "sock",
1617                    "config": [
1618                        {
1619                            "method": "sock_set_default_impl",
1620                            "params": {
1621                                "impl_name": self.sock_impl
1622                            }
1623                        }
1624                    ]
1625                }
1626            ]
1627        }
1628
1629        for i, subsys in enumerate(remote_subsystem_list):
1630            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1631            nvme_ctrl = {
1632                "method": "bdev_nvme_attach_controller",
1633                "params": {
1634                    "name": "Nvme{}".format(i),
1635                    "trtype": self.transport,
1636                    "traddr": sub_addr,
1637                    "trsvcid": sub_port,
1638                    "subnqn": sub_nqn,
1639                    "adrfam": "IPv4"
1640                }
1641            }
1642
1643            if self.enable_adq:
1644                nvme_ctrl["params"].update({"priority": "1"})
1645
1646            if self.enable_data_digest:
1647                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1648
1649            spdk_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1650
1651        return json.dumps(spdk_cfg_section, indent=2)
1652
1653    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0, numa_align=True):
1654        self.available_cpus = self.get_numa_cpu_map()
1655        filename_section = ""
1656        if len(threads) >= len(subsystems):
1657            threads = range(0, len(subsystems))
1658
1659        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1660        # to allow better grouping when splitting into fio sections.
1661        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1662        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1663        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1664
1665        nvme_per_split = int(len(subsystems) / len(threads))
1666        remainder = len(subsystems) % len(threads)
1667        iterator = iter(filenames)
1668        result = []
1669        for i in range(len(threads)):
1670            result.append([])
1671            for _ in range(nvme_per_split):
1672                result[i].append(next(iterator))
1673            if remainder:
1674                result[i].append(next(iterator))
1675                remainder -= 1
1676        for i, r in enumerate(result):
1677            header = "[filename%s]" % i
1678            disks = "\n".join(["filename=%s" % x for x in r])
1679            job_section_qd = round((io_depth * len(r)) / num_jobs)
1680            if job_section_qd == 0:
1681                job_section_qd = 1
1682            iodepth = "iodepth=%s" % job_section_qd
1683
1684            offset_section = ""
1685            if offset:
1686                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1687
1688            numa_opts = ""
1689            if numa_align:
1690                numa_opts = self.gen_fio_numa_section(r, num_jobs)
1691
1692            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1693
1694        return filename_section
1695
1696    def get_nvme_subsystem_numa(self, bdev_name):
1697        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1698        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1699
1700        # Remove two last characters to get controller name instead of subsystem name
1701        nvme_ctrl = bdev_name[:-2]
1702        for bdev in bdev_conf_json_obj:
1703            if bdev["method"] == "bdev_nvme_attach_controller" and bdev["params"]["name"] == nvme_ctrl:
1704                return self.get_route_nic_numa(bdev["params"]["traddr"])
1705        return None
1706
1707
1708def initiators_match_subsystems(initiators, target_obj):
1709    for i in initiators:
1710        i.match_subsystems(target_obj.subsystem_info_list)
1711        if i.enable_adq:
1712            i.adq_configure_tc()
1713
1714
1715def run_single_fio_test(args,
1716                        initiators,
1717                        configs,
1718                        target_obj,
1719                        block_size,
1720                        io_depth,
1721                        rw,
1722                        fio_rw_mix_read,
1723                        fio_run_num,
1724                        fio_ramp_time,
1725                        fio_run_time):
1726
1727    for run_no in range(1, fio_run_num+1):
1728        threads = []
1729        power_daemon = None
1730        measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1731
1732        for i, cfg in zip(initiators, configs):
1733            t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1734            threads.append(t)
1735        if target_obj.enable_sar:
1736            sar_file_prefix = measurements_prefix + "_sar"
1737            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1738            threads.append(t)
1739
1740        if target_obj.enable_pcm:
1741            pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu"]]
1742            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1743                                         args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1744            threads.append(pcm_cpu_t)
1745
1746        if target_obj.enable_bw:
1747            bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1748            t = threading.Thread(target=target_obj.measure_network_bandwidth,
1749                                 args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1750            threads.append(t)
1751
1752        if target_obj.enable_dpdk_memory:
1753            dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1754            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1755            threads.append(t)
1756
1757        if target_obj.enable_pm:
1758            power_daemon = threading.Thread(target=target_obj.measure_power,
1759                                            args=(args.results, measurements_prefix, script_full_dir,
1760                                                  fio_ramp_time, fio_run_time))
1761            threads.append(power_daemon)
1762
1763        for t in threads:
1764            t.start()
1765        for t in threads:
1766            t.join()
1767
1768
1769def run_fio_tests(args, initiators, target_obj,
1770                  fio_workloads, fio_rw_mix_read,
1771                  fio_run_num, fio_ramp_time, fio_run_time,
1772                  fio_rate_iops, fio_offset, fio_offset_inc,
1773                  fio_numa_align):
1774
1775    for block_size, io_depth, rw in fio_workloads:
1776        configs = []
1777        for i in initiators:
1778            i.init_connect()
1779            cfg = i.gen_fio_config(rw,
1780                                   fio_rw_mix_read,
1781                                   block_size,
1782                                   io_depth,
1783                                   target_obj.subsys_no,
1784                                   fio_num_jobs,
1785                                   fio_ramp_time,
1786                                   fio_run_time,
1787                                   fio_rate_iops,
1788                                   fio_offset,
1789                                   fio_offset_inc,
1790                                   fio_numa_align)
1791            configs.append(cfg)
1792
1793        run_single_fio_test(args,
1794                            initiators,
1795                            configs,
1796                            target_obj,
1797                            block_size,
1798                            io_depth,
1799                            rw,
1800                            fio_rw_mix_read,
1801                            fio_run_num,
1802                            fio_ramp_time,
1803                            fio_run_time)
1804
1805        for i in initiators:
1806            i.init_disconnect()
1807            i.copy_result_files(args.results)
1808
1809    try:
1810        parse_results(args.results, args.csv_filename)
1811    except Exception as err:
1812        logging.error("There was an error with parsing the results")
1813        raise err
1814
1815
1816class CpuThrottlingError(Exception):
1817    def __init__(self, message):
1818        super().__init__(message)
1819
1820
1821if __name__ == "__main__":
1822    exit_code = 0
1823
1824    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1825    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1826
1827    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1828    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1829                        help='Configuration file.')
1830    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1831                        help='Results directory.')
1832    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1833                        help='CSV results filename.')
1834    parser.add_argument('-f', '--force', default=False, action='store_true',
1835                        dest='force', help="""Force script to continue and try to use all
1836                        available NVMe devices during test.
1837                        WARNING: Might result in data loss on used NVMe drives""")
1838
1839    args = parser.parse_args()
1840
1841    logging.basicConfig(level=logging.INFO,
1842                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1843
1844    logging.info("Using config file: %s" % args.config)
1845    with open(args.config, "r") as config:
1846        data = json.load(config)
1847
1848    initiators = []
1849    general_config = data["general"]
1850    target_config = data["target"]
1851    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1852
1853    if "null_block_devices" not in data["target"] and \
1854        (args.force is False and
1855            "allowlist" not in data["target"] and
1856            "blocklist" not in data["target"]):
1857        # TODO: Also check if allowlist or blocklist are not empty.
1858        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1859        You can choose to use all available NVMe drives on your system, which may potentially
1860        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1861        exit(1)
1862
1863    for k, v in data.items():
1864        if "target" in k:
1865            v.update({"results_dir": args.results})
1866            if data[k]["mode"] == "spdk":
1867                target_obj = SPDKTarget(k, data["general"], v)
1868            elif data[k]["mode"] == "kernel":
1869                target_obj = KernelTarget(k, data["general"], v)
1870        elif "initiator" in k:
1871            if data[k]["mode"] == "spdk":
1872                init_obj = SPDKInitiator(k, data["general"], v)
1873            elif data[k]["mode"] == "kernel":
1874                init_obj = KernelInitiator(k, data["general"], v)
1875            initiators.append(init_obj)
1876        elif "fio" in k:
1877            fio_workloads = itertools.product(data[k]["bs"],
1878                                              data[k]["qd"],
1879                                              data[k]["rw"])
1880
1881            fio_run_time = data[k]["run_time"]
1882            fio_ramp_time = data[k]["ramp_time"]
1883            fio_rw_mix_read = data[k]["rwmixread"]
1884            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1885            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1886
1887            fio_rate_iops = 0
1888            if "rate_iops" in data[k]:
1889                fio_rate_iops = data[k]["rate_iops"]
1890
1891            fio_offset = False
1892            if "offset" in data[k]:
1893                fio_offset = data[k]["offset"]
1894            fio_offset_inc = 0
1895            if "offset_inc" in data[k]:
1896                fio_offset_inc = data[k]["offset_inc"]
1897            fio_numa_align = True
1898            if "numa_align" in data[k]:
1899                fio_numa_align = data[k]["numa_align"]
1900        else:
1901            continue
1902
1903    try:
1904        os.mkdir(args.results)
1905    except FileExistsError:
1906        pass
1907
1908    for i in initiators:
1909        target_obj.initiator_info.append(
1910            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1911        )
1912
1913    try:
1914        target_obj.tgt_start()
1915        initiators_match_subsystems(initiators, target_obj)
1916
1917        # Poor mans threading
1918        # Run FIO tests
1919        run_fio_tests(args, initiators, target_obj, fio_workloads, fio_rw_mix_read,
1920                      fio_run_num, fio_ramp_time, fio_run_time, fio_rate_iops,
1921                      fio_offset, fio_offset_inc, fio_numa_align)
1922
1923    except Exception as e:
1924        logging.error("Exception occurred while running FIO tests")
1925        logging.error(e)
1926        exit_code = 1
1927
1928    finally:
1929        for i in [*initiators, target_obj]:
1930            try:
1931                i.stop()
1932            except CpuThrottlingError as err:
1933                logging.error(err)
1934                exit_code = 1
1935            except Exception as err:
1936                pass
1937        sys.exit(exit_code)
1938