xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 7aa2cc29c0b532d2ec949c8d0c6084df9a3d6cab)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24
25sys.path.append(os.path.dirname(__file__) + '/../../../python')
26
27import spdk.rpc as rpc  # noqa
28import spdk.rpc.client as rpc_client  # noqa
29
30
31class Server:
32    def __init__(self, name, general_config, server_config):
33        self.name = name
34        self.username = general_config["username"]
35        self.password = general_config["password"]
36        self.transport = general_config["transport"].lower()
37        self.nic_ips = server_config["nic_ips"]
38        self.mode = server_config["mode"]
39
40        self.log = logging.getLogger(self.name)
41
42        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
43        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
44            self.irq_scripts_dir = server_config["irq_scripts_dir"]
45
46        self.local_nic_info = []
47        self._nics_json_obj = {}
48        self.svc_restore_dict = {}
49        self.sysctl_restore_dict = {}
50        self.tuned_restore_dict = {}
51        self.governor_restore = ""
52        self.tuned_profile = ""
53
54        self.enable_adq = False
55        self.adq_priority = None
56        if "adq_enable" in server_config and server_config["adq_enable"]:
57            self.enable_adq = server_config["adq_enable"]
58            self.adq_priority = 1
59
60        if "tuned_profile" in server_config:
61            self.tuned_profile = server_config["tuned_profile"]
62
63        if not re.match("^[A-Za-z0-9]*$", name):
64            self.log.info("Please use a name which contains only letters or numbers")
65            sys.exit(1)
66
67    @staticmethod
68    def get_uncommented_lines(lines):
69        return [line for line in lines if line and not line.startswith('#')]
70
71    def get_nic_name_by_ip(self, ip):
72        if not self._nics_json_obj:
73            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
74            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
75        for nic in self._nics_json_obj:
76            for addr in nic["addr_info"]:
77                if ip in addr["local"]:
78                    return nic["ifname"]
79
80    def set_local_nic_info_helper(self):
81        pass
82
83    def set_local_nic_info(self, pci_info):
84        def extract_network_elements(json_obj):
85            nic_list = []
86            if isinstance(json_obj, list):
87                for x in json_obj:
88                    nic_list.extend(extract_network_elements(x))
89            elif isinstance(json_obj, dict):
90                if "children" in json_obj:
91                    nic_list.extend(extract_network_elements(json_obj["children"]))
92                if "class" in json_obj.keys() and "network" in json_obj["class"]:
93                    nic_list.append(json_obj)
94            return nic_list
95
96        self.local_nic_info = extract_network_elements(pci_info)
97
98    def get_nic_numa_node(self, nic_name):
99        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
100
101    def get_numa_cpu_map(self):
102        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
103        numa_cpu_json_map = {}
104
105        for cpu in numa_cpu_json_obj["cpus"]:
106            cpu_num = int(cpu["cpu"])
107            numa_node = int(cpu["node"])
108            numa_cpu_json_map.setdefault(numa_node, [])
109            numa_cpu_json_map[numa_node].append(cpu_num)
110
111        return numa_cpu_json_map
112
113    # pylint: disable=R0201
114    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
115        return ""
116
117    def configure_system(self):
118        self.load_drivers()
119        self.configure_services()
120        self.configure_sysctl()
121        self.configure_tuned()
122        self.configure_cpu_governor()
123        self.configure_irq_affinity()
124
125    def load_drivers(self):
126        self.log.info("Loading drivers")
127        self.exec_cmd(["sudo", "modprobe", "-a",
128                       "nvme-%s" % self.transport,
129                       "nvmet-%s" % self.transport])
130        if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block:
131            self.exec_cmd(["sudo", "modprobe", "null_blk",
132                           "nr_devices=%s" % self.null_block])
133
134    def configure_adq(self):
135        if self.mode == "kernel":
136            self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
137            return
138        self.adq_load_modules()
139        self.adq_configure_nic()
140
141    def adq_load_modules(self):
142        self.log.info("Modprobing ADQ-related Linux modules...")
143        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
144        for module in adq_module_deps:
145            try:
146                self.exec_cmd(["sudo", "modprobe", module])
147                self.log.info("%s loaded!" % module)
148            except CalledProcessError as e:
149                self.log.error("ERROR: failed to load module %s" % module)
150                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
151
152    def adq_configure_tc(self):
153        self.log.info("Configuring ADQ Traffic classes and filters...")
154
155        if self.mode == "kernel":
156            self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
157            return
158
159        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
160        num_queues_tc1 = self.num_cores
161        port_param = "dst_port" if isinstance(self, Target) else "src_port"
162        port = "4420"
163        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
164
165        for nic_ip in self.nic_ips:
166            nic_name = self.get_nic_name_by_ip(nic_ip)
167            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
168                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
169                                "queues", "%s@0" % num_queues_tc0,
170                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
171                                "hw", "1", "mode", "channel"]
172            self.log.info(" ".join(tc_qdisc_map_cmd))
173            self.exec_cmd(tc_qdisc_map_cmd)
174
175            time.sleep(5)
176            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
177            self.log.info(" ".join(tc_qdisc_ingress_cmd))
178            self.exec_cmd(tc_qdisc_ingress_cmd)
179
180            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
181                             "protocol", "ip", "ingress", "prio", "1", "flower",
182                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
183                             "skip_sw", "hw_tc", "1"]
184            self.log.info(" ".join(tc_filter_cmd))
185            self.exec_cmd(tc_filter_cmd)
186
187            # show tc configuration
188            self.log.info("Show tc configuration for %s NIC..." % nic_name)
189            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
190            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
191            self.log.info("%s" % tc_disk_out)
192            self.log.info("%s" % tc_filter_out)
193
194            # Ethtool coalesce settings must be applied after configuring traffic classes
195            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
196            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
197
198            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
199            xps_cmd = ["sudo", xps_script_path, nic_name]
200            self.log.info(xps_cmd)
201            self.exec_cmd(xps_cmd)
202
203    def reload_driver(self, driver):
204        try:
205            self.exec_cmd(["sudo", "rmmod", driver])
206            self.exec_cmd(["sudo", "modprobe", driver])
207        except CalledProcessError as e:
208            self.log.error("ERROR: failed to reload %s module!" % driver)
209            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
210
211    def adq_configure_nic(self):
212        self.log.info("Configuring NIC port settings for ADQ testing...")
213
214        # Reload the driver first, to make sure any previous settings are re-set.
215        self.reload_driver("ice")
216
217        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
218        for nic in nic_names:
219            self.log.info(nic)
220            try:
221                self.exec_cmd(["sudo", "ethtool", "-K", nic,
222                               "hw-tc-offload", "on"])  # Enable hardware TC offload
223                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
224                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
225                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
226                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
227                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
228                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
229                               "channel-pkt-inspect-optimize", "on"])
230            except CalledProcessError as e:
231                self.log.error("ERROR: failed to configure NIC port using ethtool!")
232                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
233                self.log.info("Please update your NIC driver and firmware versions and try again.")
234            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
235            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
236
237    def configure_services(self):
238        self.log.info("Configuring active services...")
239        svc_config = configparser.ConfigParser(strict=False)
240
241        # Below list is valid only for RHEL / Fedora systems and might not
242        # contain valid names for other distributions.
243        svc_target_state = {
244            "firewalld": "inactive",
245            "irqbalance": "inactive",
246            "lldpad.service": "inactive",
247            "lldpad.socket": "inactive"
248        }
249
250        for service in svc_target_state:
251            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
252            out = "\n".join(["[%s]" % service, out])
253            svc_config.read_string(out)
254
255            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
256                continue
257
258            service_state = svc_config[service]["ActiveState"]
259            self.log.info("Current state of %s service is %s" % (service, service_state))
260            self.svc_restore_dict.update({service: service_state})
261            if service_state != "inactive":
262                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
263                self.exec_cmd(["sudo", "systemctl", "stop", service])
264
265    def configure_sysctl(self):
266        self.log.info("Tuning sysctl settings...")
267
268        busy_read = 0
269        if self.enable_adq and self.mode == "spdk":
270            busy_read = 1
271
272        sysctl_opts = {
273            "net.core.busy_poll": 0,
274            "net.core.busy_read": busy_read,
275            "net.core.somaxconn": 4096,
276            "net.core.netdev_max_backlog": 8192,
277            "net.ipv4.tcp_max_syn_backlog": 16384,
278            "net.core.rmem_max": 268435456,
279            "net.core.wmem_max": 268435456,
280            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
281            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
282            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
283            "net.ipv4.route.flush": 1,
284            "vm.overcommit_memory": 1,
285        }
286
287        for opt, value in sysctl_opts.items():
288            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
289            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
290
291    def configure_tuned(self):
292        if not self.tuned_profile:
293            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
294            return
295
296        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
297        service = "tuned"
298        tuned_config = configparser.ConfigParser(strict=False)
299
300        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
301        out = "\n".join(["[%s]" % service, out])
302        tuned_config.read_string(out)
303        tuned_state = tuned_config[service]["ActiveState"]
304        self.svc_restore_dict.update({service: tuned_state})
305
306        if tuned_state != "inactive":
307            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
308            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
309
310            self.tuned_restore_dict = {
311                "profile": profile,
312                "mode": profile_mode
313            }
314
315        self.exec_cmd(["sudo", "systemctl", "start", service])
316        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
317        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
318
319    def configure_cpu_governor(self):
320        self.log.info("Setting CPU governor to performance...")
321
322        # This assumes that there is the same CPU scaling governor on each CPU
323        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
324        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
325
326    def configure_irq_affinity(self):
327        self.log.info("Setting NIC irq affinity for NICs...")
328
329        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
330        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
331        for nic in nic_names:
332            irq_cmd = ["sudo", irq_script_path, nic]
333            self.log.info(irq_cmd)
334            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
335
336    def restore_services(self):
337        self.log.info("Restoring services...")
338        for service, state in self.svc_restore_dict.items():
339            cmd = "stop" if state == "inactive" else "start"
340            self.exec_cmd(["sudo", "systemctl", cmd, service])
341
342    def restore_sysctl(self):
343        self.log.info("Restoring sysctl settings...")
344        for opt, value in self.sysctl_restore_dict.items():
345            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
346
347    def restore_tuned(self):
348        self.log.info("Restoring tuned-adm settings...")
349
350        if not self.tuned_restore_dict:
351            return
352
353        if self.tuned_restore_dict["mode"] == "auto":
354            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
355            self.log.info("Reverted tuned-adm to auto_profile.")
356        else:
357            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
358            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
359
360    def restore_governor(self):
361        self.log.info("Restoring CPU governor setting...")
362        if self.governor_restore:
363            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
364            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
365
366
367class Target(Server):
368    def __init__(self, name, general_config, target_config):
369        super().__init__(name, general_config, target_config)
370
371        # Defaults
372        self.enable_sar = False
373        self.sar_delay = 0
374        self.sar_interval = 0
375        self.sar_count = 0
376        self.enable_pcm = False
377        self.pcm_dir = ""
378        self.pcm_delay = 0
379        self.pcm_interval = 0
380        self.pcm_count = 0
381        self.enable_bandwidth = 0
382        self.bandwidth_count = 0
383        self.enable_dpdk_memory = False
384        self.dpdk_wait_time = 0
385        self.enable_zcopy = False
386        self.scheduler_name = "static"
387        self.null_block = 0
388        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
389        self.subsystem_info_list = []
390        self.initiator_info = []
391        self.enable_pm = False
392        self.pm_delay = 0
393        self.pm_interval = 0
394        self.pm_count = 1
395
396        if "null_block_devices" in target_config:
397            self.null_block = target_config["null_block_devices"]
398        if "sar_settings" in target_config:
399            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
400        if "pcm_settings" in target_config:
401            self.enable_pcm = True
402            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
403        if "enable_bandwidth" in target_config:
404            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
405        if "enable_dpdk_memory" in target_config:
406            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
407        if "scheduler_settings" in target_config:
408            self.scheduler_name = target_config["scheduler_settings"]
409        if "zcopy_settings" in target_config:
410            self.enable_zcopy = target_config["zcopy_settings"]
411        if "results_dir" in target_config:
412            self.results_dir = target_config["results_dir"]
413        if "pm_settings" in target_config:
414            self.enable_pm, self.pm_delay, self.pm_interval, self.pm_count = target_config["pm_settings"]
415            # Normalize pm_count - <= 0 means to loop indefinitely so let's avoid that to not block forever
416            self.pm_count = self.pm_count if self.pm_count > 0 else 1
417
418        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
419        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
420        self.set_local_nic_info(self.set_local_nic_info_helper())
421
422        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
423            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
424
425        self.configure_system()
426        if self.enable_adq:
427            self.configure_adq()
428        self.sys_config()
429
430    def set_local_nic_info_helper(self):
431        return json.loads(self.exec_cmd(["lshw", "-json"]))
432
433    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
434        stderr_opt = None
435        if stderr_redirect:
436            stderr_opt = subprocess.STDOUT
437        if change_dir:
438            old_cwd = os.getcwd()
439            os.chdir(change_dir)
440            self.log.info("Changing directory to %s" % change_dir)
441
442        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
443
444        if change_dir:
445            os.chdir(old_cwd)
446            self.log.info("Changing directory to %s" % old_cwd)
447        return out
448
449    def zip_spdk_sources(self, spdk_dir, dest_file):
450        self.log.info("Zipping SPDK source directory")
451        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
452        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
453            for file in files:
454                fh.write(os.path.relpath(os.path.join(root, file)))
455        fh.close()
456        self.log.info("Done zipping")
457
458    @staticmethod
459    def _chunks(input_list, chunks_no):
460        div, rem = divmod(len(input_list), chunks_no)
461        for i in range(chunks_no):
462            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
463            yield input_list[si:si + (div + 1 if i < rem else div)]
464
465    def spread_bdevs(self, req_disks):
466        # Spread available block devices indexes:
467        # - evenly across available initiator systems
468        # - evenly across available NIC interfaces for
469        #   each initiator
470        # Not NUMA aware.
471        ip_bdev_map = []
472        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
473
474        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
475            self.initiator_info[i]["bdev_range"] = init_chunk
476            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
477            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
478                for c in nic_chunk:
479                    ip_bdev_map.append((ip, c))
480        return ip_bdev_map
481
482    def measure_sar(self, results_dir, sar_file_prefix):
483        cpu_number = os.cpu_count()
484        sar_idle_sum = 0
485        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
486        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
487
488        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
489        time.sleep(self.sar_delay)
490        self.log.info("Starting SAR measurements")
491
492        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
493        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
494            for line in out.split("\n"):
495                if "Average" in line:
496                    if "CPU" in line:
497                        self.log.info("Summary CPU utilization from SAR:")
498                        self.log.info(line)
499                    elif "all" in line:
500                        self.log.info(line)
501                    else:
502                        sar_idle_sum += float(line.split()[7])
503            fh.write(out)
504        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
505
506        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
507            f.write("%0.2f" % sar_cpu_usage)
508
509    def measure_power(self, results_dir, prefix, script_full_dir):
510        time.sleep(self.pm_delay)
511        self.log_print("Starting power measurements")
512        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
513                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
514                       "-x", "-c", "%s" % self.pm_count, "-t", "%s" % self.pm_interval, "-r"])
515
516    def ethtool_after_fio_ramp(self, fio_ramp_time):
517        time.sleep(fio_ramp_time//2)
518        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
519        for nic in nic_names:
520            self.log.info(nic)
521            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
522                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
523
524    def measure_pcm_memory(self, results_dir, pcm_file_name):
525        time.sleep(self.pcm_delay)
526        cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
527        pcm_memory = subprocess.Popen(cmd)
528        time.sleep(self.pcm_count)
529        pcm_memory.terminate()
530
531    def measure_pcm(self, results_dir, pcm_file_name):
532        time.sleep(self.pcm_delay)
533        cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count,
534               "-csv=%s/%s" % (results_dir, pcm_file_name)]
535        subprocess.run(cmd)
536        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
537        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
538        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
539        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
540        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
541
542    def measure_pcm_power(self, results_dir, pcm_power_file_name):
543        time.sleep(self.pcm_delay)
544        out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
545        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
546            fh.write(out)
547
548    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
549        self.log.info("INFO: starting network bandwidth measure")
550        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
551                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
552
553    def measure_dpdk_memory(self, results_dir):
554        self.log.info("INFO: waiting to generate DPDK memory usage")
555        time.sleep(self.dpdk_wait_time)
556        self.log.info("INFO: generating DPDK memory usage")
557        rpc.env.env_dpdk_get_mem_stats
558        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
559
560    def sys_config(self):
561        self.log.info("====Kernel release:====")
562        self.log.info(os.uname().release)
563        self.log.info("====Kernel command line:====")
564        with open('/proc/cmdline') as f:
565            cmdline = f.readlines()
566            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
567        self.log.info("====sysctl conf:====")
568        with open('/etc/sysctl.conf') as f:
569            sysctl = f.readlines()
570            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
571        self.log.info("====Cpu power info:====")
572        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
573        self.log.info("====zcopy settings:====")
574        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
575        self.log.info("====Scheduler settings:====")
576        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
577
578
579class Initiator(Server):
580    def __init__(self, name, general_config, initiator_config):
581        super().__init__(name, general_config, initiator_config)
582
583        # Required fields
584        self.ip = initiator_config["ip"]
585        self.target_nic_ips = initiator_config["target_nic_ips"]
586
587        # Defaults
588        self.cpus_allowed = None
589        self.cpus_allowed_policy = "shared"
590        self.spdk_dir = "/tmp/spdk"
591        self.fio_bin = "/usr/src/fio/fio"
592        self.nvmecli_bin = "nvme"
593        self.cpu_frequency = None
594        self.subsystem_info_list = []
595
596        if "spdk_dir" in initiator_config:
597            self.spdk_dir = initiator_config["spdk_dir"]
598        if "fio_bin" in initiator_config:
599            self.fio_bin = initiator_config["fio_bin"]
600        if "nvmecli_bin" in initiator_config:
601            self.nvmecli_bin = initiator_config["nvmecli_bin"]
602        if "cpus_allowed" in initiator_config:
603            self.cpus_allowed = initiator_config["cpus_allowed"]
604        if "cpus_allowed_policy" in initiator_config:
605            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
606        if "cpu_frequency" in initiator_config:
607            self.cpu_frequency = initiator_config["cpu_frequency"]
608        if os.getenv('SPDK_WORKSPACE'):
609            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
610
611        self.ssh_connection = paramiko.SSHClient()
612        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
613        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
614        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
615        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
616        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
617
618        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
619            self.copy_spdk("/tmp/spdk.zip")
620        self.set_local_nic_info(self.set_local_nic_info_helper())
621        self.set_cpu_frequency()
622        self.configure_system()
623        if self.enable_adq:
624            self.configure_adq()
625        self.sys_config()
626
627    def set_local_nic_info_helper(self):
628        return json.loads(self.exec_cmd(["lshw", "-json"]))
629
630    def stop(self):
631        self.ssh_connection.close()
632
633    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
634        if change_dir:
635            cmd = ["cd", change_dir, ";", *cmd]
636
637        # In case one of the command elements contains whitespace and is not
638        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
639        # when sending to remote system.
640        for i, c in enumerate(cmd):
641            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
642                cmd[i] = '"%s"' % c
643        cmd = " ".join(cmd)
644
645        # Redirect stderr to stdout thanks using get_pty option if needed
646        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
647        out = stdout.read().decode(encoding="utf-8")
648
649        # Check the return code
650        rc = stdout.channel.recv_exit_status()
651        if rc:
652            raise CalledProcessError(int(rc), cmd, out)
653
654        return out
655
656    def put_file(self, local, remote_dest):
657        ftp = self.ssh_connection.open_sftp()
658        ftp.put(local, remote_dest)
659        ftp.close()
660
661    def get_file(self, remote, local_dest):
662        ftp = self.ssh_connection.open_sftp()
663        ftp.get(remote, local_dest)
664        ftp.close()
665
666    def copy_spdk(self, local_spdk_zip):
667        self.log.info("Copying SPDK sources to initiator %s" % self.name)
668        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
669        self.log.info("Copied sources zip from target")
670        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
671        self.log.info("Sources unpacked")
672
673    def copy_result_files(self, dest_dir):
674        self.log.info("Copying results")
675
676        if not os.path.exists(dest_dir):
677            os.mkdir(dest_dir)
678
679        # Get list of result files from initiator and copy them back to target
680        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
681
682        for file in file_list:
683            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
684                          os.path.join(dest_dir, file))
685        self.log.info("Done copying results")
686
687    def match_subsystems(self, target_subsytems):
688        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
689        subsystems.sort(key=lambda x: x[1])
690        self.log.info("Found matching subsystems on target side:")
691        for s in subsystems:
692            self.log.info(s)
693        self.subsystem_info_list = subsystems
694
695    def gen_fio_filename_conf(self, *args, **kwargs):
696        # Logic implemented in SPDKInitiator and KernelInitiator classes
697        pass
698
699    def get_route_nic_numa(self, remote_nvme_ip):
700        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
701        local_nvme_nic = local_nvme_nic[0]["dev"]
702        return self.get_nic_numa_node(local_nvme_nic)
703
704    @staticmethod
705    def gen_fio_offset_section(offset_inc, num_jobs):
706        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
707        return "\n".join(["size=%s%%" % offset_inc,
708                          "offset=0%",
709                          "offset_increment=%s%%" % offset_inc])
710
711    def gen_fio_numa_section(self, fio_filenames_list):
712        numa_stats = {}
713        for nvme in fio_filenames_list:
714            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
715            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
716
717        # Use the most common NUMA node for this chunk to allocate memory and CPUs
718        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
719
720        return "\n".join(["numa_cpu_nodes=%s" % section_local_numa,
721                          "numa_mem_policy=prefer:%s" % section_local_numa])
722
723    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
724                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
725                       offset=False, offset_inc=0):
726        fio_conf_template = """
727[global]
728ioengine={ioengine}
729{spdk_conf}
730thread=1
731group_reporting=1
732direct=1
733percentile_list=50:90:99:99.5:99.9:99.99:99.999
734
735norandommap=1
736rw={rw}
737rwmixread={rwmixread}
738bs={block_size}
739time_based=1
740ramp_time={ramp_time}
741runtime={run_time}
742rate_iops={rate_iops}
743"""
744        if "spdk" in self.mode:
745            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
746            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
747        else:
748            ioengine = self.ioengine
749            spdk_conf = ""
750            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
751                                 "|", "awk", "'{print $1}'"])
752            subsystems = [x for x in out.split("\n") if "nvme" in x]
753
754        if self.cpus_allowed is not None:
755            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
756            cpus_num = 0
757            cpus = self.cpus_allowed.split(",")
758            for cpu in cpus:
759                if "-" in cpu:
760                    a, b = cpu.split("-")
761                    a = int(a)
762                    b = int(b)
763                    cpus_num += len(range(a, b))
764                else:
765                    cpus_num += 1
766            self.num_cores = cpus_num
767            threads = range(0, self.num_cores)
768        elif hasattr(self, 'num_cores'):
769            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
770            threads = range(0, int(self.num_cores))
771        else:
772            self.num_cores = len(subsystems)
773            threads = range(0, len(subsystems))
774
775        if "spdk" in self.mode:
776            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
777                                                          offset, offset_inc)
778        else:
779            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs,
780                                                          offset, offset_inc)
781
782        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
783                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
784                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
785
786        # TODO: hipri disabled for now, as it causes fio errors:
787        # io_u error on file /dev/nvme2n1: Operation not supported
788        # See comment in KernelInitiator class, init_connect() function
789        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
790            fio_config = fio_config + """
791fixedbufs=1
792registerfiles=1
793#hipri=1
794"""
795        if num_jobs:
796            fio_config = fio_config + "numjobs=%s \n" % num_jobs
797        if self.cpus_allowed is not None:
798            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
799            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
800        fio_config = fio_config + filename_section
801
802        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
803        if hasattr(self, "num_cores"):
804            fio_config_filename += "_%sCPU" % self.num_cores
805        fio_config_filename += ".fio"
806
807        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
808        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
809        self.log.info("Created FIO Config:")
810        self.log.info(fio_config)
811
812        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
813
814    def set_cpu_frequency(self):
815        if self.cpu_frequency is not None:
816            try:
817                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
818                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
819                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
820            except Exception:
821                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
822                sys.exit()
823        else:
824            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
825
826    def run_fio(self, fio_config_file, run_num=None):
827        job_name, _ = os.path.splitext(fio_config_file)
828        self.log.info("Starting FIO run for job: %s" % job_name)
829        self.log.info("Using FIO: %s" % self.fio_bin)
830
831        if run_num:
832            for i in range(1, run_num + 1):
833                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
834                try:
835                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
836                                            "--output=%s" % output_filename, "--eta=never"], True)
837                    self.log.info(output)
838                except subprocess.CalledProcessError as e:
839                    self.log.error("ERROR: Fio process failed!")
840                    self.log.info(e.stdout)
841        else:
842            output_filename = job_name + "_" + self.name + ".json"
843            output = self.exec_cmd(["sudo", self.fio_bin,
844                                    fio_config_file, "--output-format=json",
845                                    "--output" % output_filename], True)
846            self.log.info(output)
847        self.log.info("FIO run finished. Results in: %s" % output_filename)
848
849    def sys_config(self):
850        self.log.info("====Kernel release:====")
851        self.log.info(self.exec_cmd(["uname", "-r"]))
852        self.log.info("====Kernel command line:====")
853        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
854        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
855        self.log.info("====sysctl conf:====")
856        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
857        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
858        self.log.info("====Cpu power info:====")
859        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
860
861
862class KernelTarget(Target):
863    def __init__(self, name, general_config, target_config):
864        super().__init__(name, general_config, target_config)
865        # Defaults
866        self.nvmet_bin = "nvmetcli"
867
868        if "nvmet_bin" in target_config:
869            self.nvmet_bin = target_config["nvmet_bin"]
870
871    def stop(self):
872        self.nvmet_command(self.nvmet_bin, "clear")
873
874    def get_nvme_devices(self):
875        output = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"])
876        output = [x for x in output.split("\n") if "nvme" in x]
877        return output
878
879    def nvmet_command(self, nvmet_bin, command):
880        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
881
882    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
883
884        nvmet_cfg = {
885            "ports": [],
886            "hosts": [],
887            "subsystems": [],
888        }
889
890        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
891            port = str(4420 + bdev_num)
892            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
893            serial = "SPDK00%s" % bdev_num
894            bdev_name = nvme_list[bdev_num]
895
896            nvmet_cfg["subsystems"].append({
897                "allowed_hosts": [],
898                "attr": {
899                    "allow_any_host": "1",
900                    "serial": serial,
901                    "version": "1.3"
902                },
903                "namespaces": [
904                    {
905                        "device": {
906                            "path": bdev_name,
907                            "uuid": "%s" % uuid.uuid4()
908                        },
909                        "enable": 1,
910                        "nsid": port
911                    }
912                ],
913                "nqn": nqn
914            })
915
916            nvmet_cfg["ports"].append({
917                "addr": {
918                    "adrfam": "ipv4",
919                    "traddr": ip,
920                    "trsvcid": port,
921                    "trtype": self.transport
922                },
923                "portid": bdev_num,
924                "referrals": [],
925                "subsystems": [nqn]
926            })
927
928            self.subsystem_info_list.append((port, nqn, ip))
929        self.subsys_no = len(self.subsystem_info_list)
930
931        with open("kernel.conf", "w") as fh:
932            fh.write(json.dumps(nvmet_cfg, indent=2))
933
934    def tgt_start(self):
935        self.log.info("Configuring kernel NVMeOF Target")
936
937        if self.null_block:
938            self.log.info("Configuring with null block device.")
939            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
940        else:
941            self.log.info("Configuring with NVMe drives.")
942            nvme_list = self.get_nvme_devices()
943
944        self.kernel_tgt_gen_subsystem_conf(nvme_list)
945        self.subsys_no = len(nvme_list)
946
947        self.nvmet_command(self.nvmet_bin, "clear")
948        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
949
950        if self.enable_adq:
951            self.adq_configure_tc()
952
953        self.log.info("Done configuring kernel NVMeOF Target")
954
955
956class SPDKTarget(Target):
957    def __init__(self, name, general_config, target_config):
958        super().__init__(name, general_config, target_config)
959
960        # Required fields
961        self.core_mask = target_config["core_mask"]
962        self.num_cores = self.get_num_cores(self.core_mask)
963
964        # Defaults
965        self.dif_insert_strip = False
966        self.null_block_dif_type = 0
967        self.num_shared_buffers = 4096
968        self.max_queue_depth = 128
969        self.bpf_proc = None
970        self.bpf_scripts = []
971        self.enable_dsa = False
972        self.scheduler_core_limit = None
973
974        if "num_shared_buffers" in target_config:
975            self.num_shared_buffers = target_config["num_shared_buffers"]
976        if "max_queue_depth" in target_config:
977            self.max_queue_depth = target_config["max_queue_depth"]
978        if "null_block_dif_type" in target_config:
979            self.null_block_dif_type = target_config["null_block_dif_type"]
980        if "dif_insert_strip" in target_config:
981            self.dif_insert_strip = target_config["dif_insert_strip"]
982        if "bpf_scripts" in target_config:
983            self.bpf_scripts = target_config["bpf_scripts"]
984        if "dsa_settings" in target_config:
985            self.enable_dsa = target_config["dsa_settings"]
986        if "scheduler_core_limit" in target_config:
987            self.scheduler_core_limit = target_config["scheduler_core_limit"]
988
989        self.log.info("====DSA settings:====")
990        self.log.info("DSA enabled: %s" % (self.enable_dsa))
991
992    def get_nvme_devices_count(self):
993        return len(self.get_nvme_devices())
994
995    def get_nvme_devices(self):
996        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
997        bdev_bdfs = [bdev["params"]["traddr"] for bdev in bdev_subsys_json_obj["config"]]
998        return bdev_bdfs
999
1000    @staticmethod
1001    def get_num_cores(core_mask):
1002        if "0x" in core_mask:
1003            return bin(int(core_mask, 16)).count("1")
1004        else:
1005            num_cores = 0
1006            core_mask = core_mask.replace("[", "")
1007            core_mask = core_mask.replace("]", "")
1008            for i in core_mask.split(","):
1009                if "-" in i:
1010                    x, y = i.split("-")
1011                    num_cores += len(range(int(x), int(y))) + 1
1012                else:
1013                    num_cores += 1
1014            return num_cores
1015
1016    def spdk_tgt_configure(self):
1017        self.log.info("Configuring SPDK NVMeOF target via RPC")
1018
1019        if self.enable_adq:
1020            self.adq_configure_tc()
1021
1022        # Create transport layer
1023        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1024                                       num_shared_buffers=self.num_shared_buffers,
1025                                       max_queue_depth=self.max_queue_depth,
1026                                       dif_insert_or_strip=self.dif_insert_strip,
1027                                       sock_priority=self.adq_priority)
1028        self.log.info("SPDK NVMeOF transport layer:")
1029        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1030
1031        if self.null_block:
1032            self.spdk_tgt_add_nullblock(self.null_block)
1033            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1034        else:
1035            self.spdk_tgt_add_nvme_conf()
1036            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1037
1038        self.log.info("Done configuring SPDK NVMeOF Target")
1039
1040    def spdk_tgt_add_nullblock(self, null_block_count):
1041        md_size = 0
1042        block_size = 4096
1043        if self.null_block_dif_type != 0:
1044            md_size = 128
1045
1046        self.log.info("Adding null block bdevices to config via RPC")
1047        for i in range(null_block_count):
1048            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1049            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1050                                      dif_type=self.null_block_dif_type, md_size=md_size)
1051        self.log.info("SPDK Bdevs configuration:")
1052        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1053
1054    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1055        self.log.info("Adding NVMe bdevs to config via RPC")
1056
1057        bdfs = self.get_nvme_devices()
1058        bdfs = [b.replace(":", ".") for b in bdfs]
1059
1060        if req_num_disks:
1061            if req_num_disks > len(bdfs):
1062                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1063                sys.exit(1)
1064            else:
1065                bdfs = bdfs[0:req_num_disks]
1066
1067        for i, bdf in enumerate(bdfs):
1068            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1069
1070        self.log.info("SPDK Bdevs configuration:")
1071        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1072
1073    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1074        self.log.info("Adding subsystems to config")
1075        if not req_num_disks:
1076            req_num_disks = self.get_nvme_devices_count()
1077
1078        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1079            port = str(4420 + bdev_num)
1080            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1081            serial = "SPDK00%s" % bdev_num
1082            bdev_name = "Nvme%sn1" % bdev_num
1083
1084            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1085                                           allow_any_host=True, max_namespaces=8)
1086            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1087            for nqn_name in [nqn, "discovery"]:
1088                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1089                                                     nqn=nqn_name,
1090                                                     trtype=self.transport,
1091                                                     traddr=ip,
1092                                                     trsvcid=port,
1093                                                     adrfam="ipv4")
1094            self.subsystem_info_list.append((port, nqn, ip))
1095        self.subsys_no = len(self.subsystem_info_list)
1096
1097        self.log.info("SPDK NVMeOF subsystem configuration:")
1098        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1099
1100    def bpf_start(self):
1101        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1102        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1103        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1104        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1105
1106        with open(self.pid, "r") as fh:
1107            nvmf_pid = str(fh.readline())
1108
1109        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1110        self.log.info(cmd)
1111        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1112
1113    def tgt_start(self):
1114        if self.null_block:
1115            self.subsys_no = 1
1116        else:
1117            self.subsys_no = self.get_nvme_devices_count()
1118        self.log.info("Starting SPDK NVMeOF Target process")
1119        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1120        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1121        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1122
1123        with open(self.pid, "w") as fh:
1124            fh.write(str(proc.pid))
1125        self.nvmf_proc = proc
1126        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1127        self.log.info("Waiting for spdk to initialize...")
1128        while True:
1129            if os.path.exists("/var/tmp/spdk.sock"):
1130                break
1131            time.sleep(1)
1132        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1133
1134        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1135
1136        if self.enable_zcopy:
1137            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1138                                           enable_zerocopy_send_server=True)
1139            self.log.info("Target socket options:")
1140            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1141
1142        if self.enable_adq:
1143            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1144            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1145                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1146
1147        if self.enable_dsa:
1148            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1149            self.log.info("Target DSA accel module enabled")
1150
1151        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1152        rpc.framework_start_init(self.client)
1153
1154        if self.bpf_scripts:
1155            self.bpf_start()
1156
1157        self.spdk_tgt_configure()
1158
1159    def stop(self):
1160        if self.bpf_proc:
1161            self.log.info("Stopping BPF Trace script")
1162            self.bpf_proc.terminate()
1163            self.bpf_proc.wait()
1164
1165        if hasattr(self, "nvmf_proc"):
1166            try:
1167                self.nvmf_proc.terminate()
1168                self.nvmf_proc.wait(timeout=30)
1169            except Exception as e:
1170                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1171                self.log.info(e)
1172                self.nvmf_proc.kill()
1173                self.nvmf_proc.communicate()
1174                # Try to clean up RPC socket files if they were not removed
1175                # because of using 'kill'
1176                try:
1177                    os.remove("/var/tmp/spdk.sock")
1178                    os.remove("/var/tmp/spdk.sock.lock")
1179                except FileNotFoundError:
1180                    pass
1181
1182
1183class KernelInitiator(Initiator):
1184    def __init__(self, name, general_config, initiator_config):
1185        super().__init__(name, general_config, initiator_config)
1186
1187        # Defaults
1188        self.extra_params = ""
1189        self.ioengine = "libaio"
1190
1191        if "extra_params" in initiator_config:
1192            self.extra_params = initiator_config["extra_params"]
1193
1194        if "kernel_engine" in initiator_config:
1195            self.ioengine = initiator_config["kernel_engine"]
1196            if "io_uring" in self.ioengine:
1197                self.extra_params = "--nr-poll-queues=8"
1198
1199    def get_connected_nvme_list(self):
1200        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1201        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1202                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1203        return nvme_list
1204
1205    def init_connect(self):
1206        self.log.info("Below connection attempts may result in error messages, this is expected!")
1207        for subsystem in self.subsystem_info_list:
1208            self.log.info("Trying to connect %s %s %s" % subsystem)
1209            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1210                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1211            time.sleep(2)
1212
1213        if "io_uring" in self.ioengine:
1214            self.log.info("Setting block layer settings for io_uring.")
1215
1216            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1217            #       apparently it's not possible for connected subsystems.
1218            #       Results in "error: Invalid argument"
1219            block_sysfs_settings = {
1220                "iostats": "0",
1221                "rq_affinity": "0",
1222                "nomerges": "2"
1223            }
1224
1225            for disk in self.get_connected_nvme_list():
1226                sysfs = os.path.join("/sys/block", disk, "queue")
1227                for k, v in block_sysfs_settings.items():
1228                    sysfs_opt_path = os.path.join(sysfs, k)
1229                    try:
1230                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1231                    except subprocess.CalledProcessError as e:
1232                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1233                    finally:
1234                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1235                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1236
1237    def init_disconnect(self):
1238        for subsystem in self.subsystem_info_list:
1239            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1240            time.sleep(1)
1241
1242    def get_nvme_subsystem_numa(self, dev_name):
1243        # Remove two last characters to get controller name instead of subsystem name
1244        nvme_ctrl = os.path.basename(dev_name)[:-2]
1245        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1246                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1247        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1248
1249    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1250        # Generate connected nvme devices names and sort them by used NIC numa node
1251        # to allow better grouping when splitting into fio sections.
1252        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1253        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1254        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1255
1256        filename_section = ""
1257        nvme_per_split = int(len(nvme_list) / len(threads))
1258        remainder = len(nvme_list) % len(threads)
1259        iterator = iter(nvme_list)
1260        result = []
1261        for i in range(len(threads)):
1262            result.append([])
1263            for _ in range(nvme_per_split):
1264                result[i].append(next(iterator))
1265                if remainder:
1266                    result[i].append(next(iterator))
1267                    remainder -= 1
1268        for i, r in enumerate(result):
1269            header = "[filename%s]" % i
1270            disks = "\n".join(["filename=%s" % x for x in r])
1271            job_section_qd = round((io_depth * len(r)) / num_jobs)
1272            if job_section_qd == 0:
1273                job_section_qd = 1
1274            iodepth = "iodepth=%s" % job_section_qd
1275
1276            offset_section = ""
1277            if offset:
1278                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1279
1280            numa_opts = self.gen_fio_numa_section(r)
1281
1282            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1283
1284        return filename_section
1285
1286
1287class SPDKInitiator(Initiator):
1288    def __init__(self, name, general_config, initiator_config):
1289        super().__init__(name, general_config, initiator_config)
1290
1291        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1292            self.install_spdk()
1293
1294        # Required fields
1295        self.num_cores = initiator_config["num_cores"]
1296
1297        # Optional fields
1298        self.enable_data_digest = False
1299        if "enable_data_digest" in initiator_config:
1300            self.enable_data_digest = initiator_config["enable_data_digest"]
1301
1302    def install_spdk(self):
1303        self.log.info("Using fio binary %s" % self.fio_bin)
1304        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1305        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1306        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1307        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1308        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1309
1310        self.log.info("SPDK built")
1311        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1312
1313    def init_connect(self):
1314        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1315        # bdev plugin initiates connection just before starting IO traffic.
1316        # This is just to have a "init_connect" equivalent of the same function
1317        # from KernelInitiator class.
1318        # Just prepare bdev.conf JSON file for later use and consider it
1319        # "making a connection".
1320        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1321        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1322
1323    def init_disconnect(self):
1324        # SPDK Initiator does not need to explicity disconnect as this gets done
1325        # after fio bdev plugin finishes IO.
1326        pass
1327
1328    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1329        bdev_cfg_section = {
1330            "subsystems": [
1331                {
1332                    "subsystem": "bdev",
1333                    "config": []
1334                }
1335            ]
1336        }
1337
1338        for i, subsys in enumerate(remote_subsystem_list):
1339            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1340            nvme_ctrl = {
1341                "method": "bdev_nvme_attach_controller",
1342                "params": {
1343                    "name": "Nvme{}".format(i),
1344                    "trtype": self.transport,
1345                    "traddr": sub_addr,
1346                    "trsvcid": sub_port,
1347                    "subnqn": sub_nqn,
1348                    "adrfam": "IPv4"
1349                }
1350            }
1351
1352            if self.enable_adq:
1353                nvme_ctrl["params"].update({"priority": "1"})
1354
1355            if self.enable_data_digest:
1356                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1357
1358            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1359
1360        return json.dumps(bdev_cfg_section, indent=2)
1361
1362    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1363        filename_section = ""
1364        if len(threads) >= len(subsystems):
1365            threads = range(0, len(subsystems))
1366
1367        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1368        # to allow better grouping when splitting into fio sections.
1369        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1370        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1371        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1372
1373        nvme_per_split = int(len(subsystems) / len(threads))
1374        remainder = len(subsystems) % len(threads)
1375        iterator = iter(filenames)
1376        result = []
1377        for i in range(len(threads)):
1378            result.append([])
1379            for _ in range(nvme_per_split):
1380                result[i].append(next(iterator))
1381            if remainder:
1382                result[i].append(next(iterator))
1383                remainder -= 1
1384        for i, r in enumerate(result):
1385            header = "[filename%s]" % i
1386            disks = "\n".join(["filename=%s" % x for x in r])
1387            job_section_qd = round((io_depth * len(r)) / num_jobs)
1388            if job_section_qd == 0:
1389                job_section_qd = 1
1390            iodepth = "iodepth=%s" % job_section_qd
1391
1392            offset_section = ""
1393            if offset:
1394                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1395
1396            numa_opts = self.gen_fio_numa_section(r)
1397
1398            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1399
1400        return filename_section
1401
1402    def get_nvme_subsystem_numa(self, bdev_name):
1403        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1404        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1405
1406        # Remove two last characters to get controller name instead of subsystem name
1407        nvme_ctrl = bdev_name[:-2]
1408        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1409        return self.get_route_nic_numa(remote_nvme_ip)
1410
1411
1412if __name__ == "__main__":
1413    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1414    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1415
1416    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1417    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1418                        help='Configuration file.')
1419    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1420                        help='Results directory.')
1421    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1422                        help='CSV results filename.')
1423
1424    args = parser.parse_args()
1425
1426    logging.basicConfig(level=logging.INFO,
1427                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1428
1429    logging.info("Using config file: %s" % args.config)
1430    with open(args.config, "r") as config:
1431        data = json.load(config)
1432
1433    initiators = []
1434    fio_cases = []
1435
1436    general_config = data["general"]
1437    target_config = data["target"]
1438    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1439
1440    for k, v in data.items():
1441        if "target" in k:
1442            v.update({"results_dir": args.results})
1443            if data[k]["mode"] == "spdk":
1444                target_obj = SPDKTarget(k, data["general"], v)
1445            elif data[k]["mode"] == "kernel":
1446                target_obj = KernelTarget(k, data["general"], v)
1447        elif "initiator" in k:
1448            if data[k]["mode"] == "spdk":
1449                init_obj = SPDKInitiator(k, data["general"], v)
1450            elif data[k]["mode"] == "kernel":
1451                init_obj = KernelInitiator(k, data["general"], v)
1452            initiators.append(init_obj)
1453        elif "fio" in k:
1454            fio_workloads = itertools.product(data[k]["bs"],
1455                                              data[k]["qd"],
1456                                              data[k]["rw"])
1457
1458            fio_run_time = data[k]["run_time"]
1459            fio_ramp_time = data[k]["ramp_time"]
1460            fio_rw_mix_read = data[k]["rwmixread"]
1461            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1462            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1463
1464            fio_rate_iops = 0
1465            if "rate_iops" in data[k]:
1466                fio_rate_iops = data[k]["rate_iops"]
1467
1468            fio_offset = False
1469            if "offset" in data[k]:
1470                fio_offset = data[k]["offset"]
1471            fio_offset_inc = 0
1472            if "offset_inc" in data[k]:
1473                fio_offset_inc = data[k]["offset_inc"]
1474        else:
1475            continue
1476
1477    try:
1478        os.mkdir(args.results)
1479    except FileExistsError:
1480        pass
1481
1482    for i in initiators:
1483        target_obj.initiator_info.append(
1484            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1485        )
1486
1487    # TODO: This try block is definietly too large. Need to break this up into separate
1488    # logical blocks to reduce size.
1489    try:
1490        target_obj.tgt_start()
1491
1492        for i in initiators:
1493            i.match_subsystems(target_obj.subsystem_info_list)
1494            if i.enable_adq:
1495                i.adq_configure_tc()
1496
1497        # Poor mans threading
1498        # Run FIO tests
1499        for block_size, io_depth, rw in fio_workloads:
1500            threads = []
1501            configs = []
1502            power_daemon = None
1503            for i in initiators:
1504                i.init_connect()
1505                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1506                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1507                                       fio_offset, fio_offset_inc)
1508                configs.append(cfg)
1509
1510            for i, cfg in zip(initiators, configs):
1511                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1512                threads.append(t)
1513            if target_obj.enable_sar:
1514                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1515                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1516                threads.append(t)
1517
1518            if target_obj.enable_pcm:
1519                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1520
1521                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1522                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1523                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1524
1525                threads.append(pcm_cpu_t)
1526                threads.append(pcm_mem_t)
1527                threads.append(pcm_pow_t)
1528
1529            if target_obj.enable_bandwidth:
1530                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1531                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1532                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1533                threads.append(t)
1534
1535            if target_obj.enable_dpdk_memory:
1536                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1537                threads.append(t)
1538
1539            if target_obj.enable_adq:
1540                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1541                threads.append(ethtool_thread)
1542
1543            if target_obj.enable_pm:
1544                power_daemon = threading.Thread(target=target_obj.measure_power,
1545                                                args=(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir))
1546                threads.append(power_daemon)
1547
1548            for t in threads:
1549                t.start()
1550            for t in threads:
1551                t.join()
1552
1553            for i in initiators:
1554                i.init_disconnect()
1555                i.copy_result_files(args.results)
1556
1557        target_obj.restore_governor()
1558        target_obj.restore_tuned()
1559        target_obj.restore_services()
1560        target_obj.restore_sysctl()
1561        if target_obj.enable_adq:
1562            target_obj.reload_driver("ice")
1563        for i in initiators:
1564            i.restore_governor()
1565            i.restore_tuned()
1566            i.restore_services()
1567            i.restore_sysctl()
1568            if i.enable_adq:
1569                i.reload_driver("ice")
1570        parse_results(args.results, args.csv_filename)
1571    finally:
1572        for i in initiators:
1573            try:
1574                i.stop()
1575            except Exception as err:
1576                pass
1577        target_obj.stop()
1578