xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 99a43e75ed9ac3c87d23e3746173cf5a5a992544)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.nic_ips = server_config["nic_ips"]
39        self.mode = server_config["mode"]
40
41        self.log = logging.getLogger(self.name)
42
43        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
44        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
45            self.irq_scripts_dir = server_config["irq_scripts_dir"]
46
47        self.local_nic_info = []
48        self._nics_json_obj = {}
49        self.svc_restore_dict = {}
50        self.sysctl_restore_dict = {}
51        self.tuned_restore_dict = {}
52        self.governor_restore = ""
53        self.tuned_profile = ""
54
55        self.enable_adq = False
56        self.adq_priority = None
57        if "adq_enable" in server_config and server_config["adq_enable"]:
58            self.enable_adq = server_config["adq_enable"]
59            self.adq_priority = 1
60
61        if "tuned_profile" in server_config:
62            self.tuned_profile = server_config["tuned_profile"]
63
64        if not re.match("^[A-Za-z0-9]*$", name):
65            self.log.info("Please use a name which contains only letters or numbers")
66            sys.exit(1)
67
68    @staticmethod
69    def get_uncommented_lines(lines):
70        return [line for line in lines if line and not line.startswith('#')]
71
72    def get_nic_name_by_ip(self, ip):
73        if not self._nics_json_obj:
74            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
75            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
76        for nic in self._nics_json_obj:
77            for addr in nic["addr_info"]:
78                if ip in addr["local"]:
79                    return nic["ifname"]
80
81    def set_local_nic_info_helper(self):
82        pass
83
84    def set_local_nic_info(self, pci_info):
85        def extract_network_elements(json_obj):
86            nic_list = []
87            if isinstance(json_obj, list):
88                for x in json_obj:
89                    nic_list.extend(extract_network_elements(x))
90            elif isinstance(json_obj, dict):
91                if "children" in json_obj:
92                    nic_list.extend(extract_network_elements(json_obj["children"]))
93                if "class" in json_obj.keys() and "network" in json_obj["class"]:
94                    nic_list.append(json_obj)
95            return nic_list
96
97        self.local_nic_info = extract_network_elements(pci_info)
98
99    def get_nic_numa_node(self, nic_name):
100        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
101
102    def get_numa_cpu_map(self):
103        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
104        numa_cpu_json_map = {}
105
106        for cpu in numa_cpu_json_obj["cpus"]:
107            cpu_num = int(cpu["cpu"])
108            numa_node = int(cpu["node"])
109            numa_cpu_json_map.setdefault(numa_node, [])
110            numa_cpu_json_map[numa_node].append(cpu_num)
111
112        return numa_cpu_json_map
113
114    # pylint: disable=R0201
115    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
116        return ""
117
118    def configure_system(self):
119        self.load_drivers()
120        self.configure_services()
121        self.configure_sysctl()
122        self.configure_tuned()
123        self.configure_cpu_governor()
124        self.configure_irq_affinity()
125
126    def load_drivers(self):
127        self.log.info("Loading drivers")
128        self.exec_cmd(["sudo", "modprobe", "-a",
129                       "nvme-%s" % self.transport,
130                       "nvmet-%s" % self.transport])
131        if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block:
132            self.exec_cmd(["sudo", "modprobe", "null_blk",
133                           "nr_devices=%s" % self.null_block])
134
135    def configure_adq(self):
136        if self.mode == "kernel":
137            self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
138            return
139        self.adq_load_modules()
140        self.adq_configure_nic()
141
142    def adq_load_modules(self):
143        self.log.info("Modprobing ADQ-related Linux modules...")
144        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
145        for module in adq_module_deps:
146            try:
147                self.exec_cmd(["sudo", "modprobe", module])
148                self.log.info("%s loaded!" % module)
149            except CalledProcessError as e:
150                self.log.error("ERROR: failed to load module %s" % module)
151                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
152
153    def adq_configure_tc(self):
154        self.log.info("Configuring ADQ Traffic classes and filters...")
155
156        if self.mode == "kernel":
157            self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
158            return
159
160        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
161        num_queues_tc1 = self.num_cores
162        port_param = "dst_port" if isinstance(self, Target) else "src_port"
163        port = "4420"
164        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
165
166        for nic_ip in self.nic_ips:
167            nic_name = self.get_nic_name_by_ip(nic_ip)
168            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
169                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
170                                "queues", "%s@0" % num_queues_tc0,
171                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
172                                "hw", "1", "mode", "channel"]
173            self.log.info(" ".join(tc_qdisc_map_cmd))
174            self.exec_cmd(tc_qdisc_map_cmd)
175
176            time.sleep(5)
177            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
178            self.log.info(" ".join(tc_qdisc_ingress_cmd))
179            self.exec_cmd(tc_qdisc_ingress_cmd)
180
181            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
182                             "protocol", "ip", "ingress", "prio", "1", "flower",
183                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
184                             "skip_sw", "hw_tc", "1"]
185            self.log.info(" ".join(tc_filter_cmd))
186            self.exec_cmd(tc_filter_cmd)
187
188            # show tc configuration
189            self.log.info("Show tc configuration for %s NIC..." % nic_name)
190            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
191            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
192            self.log.info("%s" % tc_disk_out)
193            self.log.info("%s" % tc_filter_out)
194
195            # Ethtool coalesce settings must be applied after configuring traffic classes
196            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
197            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
198
199            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
200            xps_cmd = ["sudo", xps_script_path, nic_name]
201            self.log.info(xps_cmd)
202            self.exec_cmd(xps_cmd)
203
204    def reload_driver(self, driver):
205        try:
206            self.exec_cmd(["sudo", "rmmod", driver])
207            self.exec_cmd(["sudo", "modprobe", driver])
208        except CalledProcessError as e:
209            self.log.error("ERROR: failed to reload %s module!" % driver)
210            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
211
212    def adq_configure_nic(self):
213        self.log.info("Configuring NIC port settings for ADQ testing...")
214
215        # Reload the driver first, to make sure any previous settings are re-set.
216        self.reload_driver("ice")
217
218        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
219        for nic in nic_names:
220            self.log.info(nic)
221            try:
222                self.exec_cmd(["sudo", "ethtool", "-K", nic,
223                               "hw-tc-offload", "on"])  # Enable hardware TC offload
224                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
225                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
226                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
227                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
228                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
229                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
230                               "channel-pkt-inspect-optimize", "on"])
231            except CalledProcessError as e:
232                self.log.error("ERROR: failed to configure NIC port using ethtool!")
233                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
234                self.log.info("Please update your NIC driver and firmware versions and try again.")
235            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
236            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
237
238    def configure_services(self):
239        self.log.info("Configuring active services...")
240        svc_config = configparser.ConfigParser(strict=False)
241
242        # Below list is valid only for RHEL / Fedora systems and might not
243        # contain valid names for other distributions.
244        svc_target_state = {
245            "firewalld": "inactive",
246            "irqbalance": "inactive",
247            "lldpad.service": "inactive",
248            "lldpad.socket": "inactive"
249        }
250
251        for service in svc_target_state:
252            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
253            out = "\n".join(["[%s]" % service, out])
254            svc_config.read_string(out)
255
256            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
257                continue
258
259            service_state = svc_config[service]["ActiveState"]
260            self.log.info("Current state of %s service is %s" % (service, service_state))
261            self.svc_restore_dict.update({service: service_state})
262            if service_state != "inactive":
263                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
264                self.exec_cmd(["sudo", "systemctl", "stop", service])
265
266    def configure_sysctl(self):
267        self.log.info("Tuning sysctl settings...")
268
269        busy_read = 0
270        if self.enable_adq and self.mode == "spdk":
271            busy_read = 1
272
273        sysctl_opts = {
274            "net.core.busy_poll": 0,
275            "net.core.busy_read": busy_read,
276            "net.core.somaxconn": 4096,
277            "net.core.netdev_max_backlog": 8192,
278            "net.ipv4.tcp_max_syn_backlog": 16384,
279            "net.core.rmem_max": 268435456,
280            "net.core.wmem_max": 268435456,
281            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
282            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
283            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
284            "net.ipv4.route.flush": 1,
285            "vm.overcommit_memory": 1,
286        }
287
288        for opt, value in sysctl_opts.items():
289            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
290            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
291
292    def configure_tuned(self):
293        if not self.tuned_profile:
294            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
295            return
296
297        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
298        service = "tuned"
299        tuned_config = configparser.ConfigParser(strict=False)
300
301        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
302        out = "\n".join(["[%s]" % service, out])
303        tuned_config.read_string(out)
304        tuned_state = tuned_config[service]["ActiveState"]
305        self.svc_restore_dict.update({service: tuned_state})
306
307        if tuned_state != "inactive":
308            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
309            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
310
311            self.tuned_restore_dict = {
312                "profile": profile,
313                "mode": profile_mode
314            }
315
316        self.exec_cmd(["sudo", "systemctl", "start", service])
317        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
318        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
319
320    def configure_cpu_governor(self):
321        self.log.info("Setting CPU governor to performance...")
322
323        # This assumes that there is the same CPU scaling governor on each CPU
324        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
325        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
326
327    def configure_irq_affinity(self):
328        self.log.info("Setting NIC irq affinity for NICs...")
329
330        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
331        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
332        for nic in nic_names:
333            irq_cmd = ["sudo", irq_script_path, nic]
334            self.log.info(irq_cmd)
335            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
336
337    def restore_services(self):
338        self.log.info("Restoring services...")
339        for service, state in self.svc_restore_dict.items():
340            cmd = "stop" if state == "inactive" else "start"
341            self.exec_cmd(["sudo", "systemctl", cmd, service])
342
343    def restore_sysctl(self):
344        self.log.info("Restoring sysctl settings...")
345        for opt, value in self.sysctl_restore_dict.items():
346            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
347
348    def restore_tuned(self):
349        self.log.info("Restoring tuned-adm settings...")
350
351        if not self.tuned_restore_dict:
352            return
353
354        if self.tuned_restore_dict["mode"] == "auto":
355            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
356            self.log.info("Reverted tuned-adm to auto_profile.")
357        else:
358            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
359            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
360
361    def restore_governor(self):
362        self.log.info("Restoring CPU governor setting...")
363        if self.governor_restore:
364            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
365            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
366
367
368class Target(Server):
369    def __init__(self, name, general_config, target_config):
370        super().__init__(name, general_config, target_config)
371
372        # Defaults
373        self.enable_sar = False
374        self.sar_delay = 0
375        self.sar_interval = 0
376        self.sar_count = 0
377        self.enable_pcm = False
378        self.pcm_dir = ""
379        self.pcm_delay = 0
380        self.pcm_interval = 0
381        self.pcm_count = 0
382        self.enable_bandwidth = 0
383        self.bandwidth_count = 0
384        self.enable_dpdk_memory = False
385        self.dpdk_wait_time = 0
386        self.enable_zcopy = False
387        self.scheduler_name = "static"
388        self.null_block = 0
389        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
390        self.subsystem_info_list = []
391        self.initiator_info = []
392        self.enable_pm = False
393        self.pm_delay = 0
394        self.pm_interval = 0
395        self.pm_count = 1
396
397        if "null_block_devices" in target_config:
398            self.null_block = target_config["null_block_devices"]
399        if "sar_settings" in target_config:
400            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
401        if "pcm_settings" in target_config:
402            self.enable_pcm = True
403            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
404        if "enable_bandwidth" in target_config:
405            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
406        if "enable_dpdk_memory" in target_config:
407            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
408        if "scheduler_settings" in target_config:
409            self.scheduler_name = target_config["scheduler_settings"]
410        if "zcopy_settings" in target_config:
411            self.enable_zcopy = target_config["zcopy_settings"]
412        if "results_dir" in target_config:
413            self.results_dir = target_config["results_dir"]
414        if "pm_settings" in target_config:
415            self.enable_pm, self.pm_delay, self.pm_interval, self.pm_count = target_config["pm_settings"]
416            # Normalize pm_count - <= 0 means to loop indefinitely so let's avoid that to not block forever
417            self.pm_count = self.pm_count if self.pm_count > 0 else 1
418
419        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
420        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
421        self.set_local_nic_info(self.set_local_nic_info_helper())
422
423        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
424            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
425
426        self.configure_system()
427        if self.enable_adq:
428            self.configure_adq()
429        self.sys_config()
430
431    def set_local_nic_info_helper(self):
432        return json.loads(self.exec_cmd(["lshw", "-json"]))
433
434    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
435        stderr_opt = None
436        if stderr_redirect:
437            stderr_opt = subprocess.STDOUT
438        if change_dir:
439            old_cwd = os.getcwd()
440            os.chdir(change_dir)
441            self.log.info("Changing directory to %s" % change_dir)
442
443        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
444
445        if change_dir:
446            os.chdir(old_cwd)
447            self.log.info("Changing directory to %s" % old_cwd)
448        return out
449
450    def zip_spdk_sources(self, spdk_dir, dest_file):
451        self.log.info("Zipping SPDK source directory")
452        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
453        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
454            for file in files:
455                fh.write(os.path.relpath(os.path.join(root, file)))
456        fh.close()
457        self.log.info("Done zipping")
458
459    @staticmethod
460    def _chunks(input_list, chunks_no):
461        div, rem = divmod(len(input_list), chunks_no)
462        for i in range(chunks_no):
463            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
464            yield input_list[si:si + (div + 1 if i < rem else div)]
465
466    def spread_bdevs(self, req_disks):
467        # Spread available block devices indexes:
468        # - evenly across available initiator systems
469        # - evenly across available NIC interfaces for
470        #   each initiator
471        # Not NUMA aware.
472        ip_bdev_map = []
473        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
474
475        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
476            self.initiator_info[i]["bdev_range"] = init_chunk
477            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
478            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
479                for c in nic_chunk:
480                    ip_bdev_map.append((ip, c))
481        return ip_bdev_map
482
483    def measure_sar(self, results_dir, sar_file_prefix):
484        cpu_number = os.cpu_count()
485        sar_idle_sum = 0
486        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
487        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
488
489        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
490        time.sleep(self.sar_delay)
491        self.log.info("Starting SAR measurements")
492
493        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
494        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
495            for line in out.split("\n"):
496                if "Average" in line:
497                    if "CPU" in line:
498                        self.log.info("Summary CPU utilization from SAR:")
499                        self.log.info(line)
500                    elif "all" in line:
501                        self.log.info(line)
502                    else:
503                        sar_idle_sum += float(line.split()[7])
504            fh.write(out)
505        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
506
507        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
508            f.write("%0.2f" % sar_cpu_usage)
509
510    def measure_power(self, results_dir, prefix, script_full_dir):
511        time.sleep(self.pm_delay)
512        self.log_print("Starting power measurements")
513        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
514                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
515                       "-x", "-c", "%s" % self.pm_count, "-t", "%s" % self.pm_interval, "-r"])
516
517    def ethtool_after_fio_ramp(self, fio_ramp_time):
518        time.sleep(fio_ramp_time//2)
519        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
520        for nic in nic_names:
521            self.log.info(nic)
522            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
523                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
524
525    def measure_pcm_memory(self, results_dir, pcm_file_name):
526        time.sleep(self.pcm_delay)
527        cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
528        pcm_memory = subprocess.Popen(cmd)
529        time.sleep(self.pcm_count)
530        pcm_memory.terminate()
531
532    def measure_pcm(self, results_dir, pcm_file_name):
533        time.sleep(self.pcm_delay)
534        cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count,
535               "-csv=%s/%s" % (results_dir, pcm_file_name)]
536        subprocess.run(cmd)
537        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
538        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
539        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
540        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
541        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
542
543    def measure_pcm_power(self, results_dir, pcm_power_file_name):
544        time.sleep(self.pcm_delay)
545        out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
546        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
547            fh.write(out)
548
549    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
550        self.log.info("INFO: starting network bandwidth measure")
551        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
552                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
553
554    def measure_dpdk_memory(self, results_dir):
555        self.log.info("INFO: waiting to generate DPDK memory usage")
556        time.sleep(self.dpdk_wait_time)
557        self.log.info("INFO: generating DPDK memory usage")
558        rpc.env.env_dpdk_get_mem_stats
559        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
560
561    def sys_config(self):
562        self.log.info("====Kernel release:====")
563        self.log.info(os.uname().release)
564        self.log.info("====Kernel command line:====")
565        with open('/proc/cmdline') as f:
566            cmdline = f.readlines()
567            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
568        self.log.info("====sysctl conf:====")
569        with open('/etc/sysctl.conf') as f:
570            sysctl = f.readlines()
571            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
572        self.log.info("====Cpu power info:====")
573        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
574        self.log.info("====zcopy settings:====")
575        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
576        self.log.info("====Scheduler settings:====")
577        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
578
579
580class Initiator(Server):
581    def __init__(self, name, general_config, initiator_config):
582        super().__init__(name, general_config, initiator_config)
583
584        # Required fields
585        self.ip = initiator_config["ip"]
586        self.target_nic_ips = initiator_config["target_nic_ips"]
587
588        # Defaults
589        self.cpus_allowed = None
590        self.cpus_allowed_policy = "shared"
591        self.spdk_dir = "/tmp/spdk"
592        self.fio_bin = "/usr/src/fio/fio"
593        self.nvmecli_bin = "nvme"
594        self.cpu_frequency = None
595        self.subsystem_info_list = []
596
597        if "spdk_dir" in initiator_config:
598            self.spdk_dir = initiator_config["spdk_dir"]
599        if "fio_bin" in initiator_config:
600            self.fio_bin = initiator_config["fio_bin"]
601        if "nvmecli_bin" in initiator_config:
602            self.nvmecli_bin = initiator_config["nvmecli_bin"]
603        if "cpus_allowed" in initiator_config:
604            self.cpus_allowed = initiator_config["cpus_allowed"]
605        if "cpus_allowed_policy" in initiator_config:
606            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
607        if "cpu_frequency" in initiator_config:
608            self.cpu_frequency = initiator_config["cpu_frequency"]
609        if os.getenv('SPDK_WORKSPACE'):
610            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
611
612        self.ssh_connection = paramiko.SSHClient()
613        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
614        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
615        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
616        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
617        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
618
619        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
620            self.copy_spdk("/tmp/spdk.zip")
621        self.set_local_nic_info(self.set_local_nic_info_helper())
622        self.set_cpu_frequency()
623        self.configure_system()
624        if self.enable_adq:
625            self.configure_adq()
626        self.sys_config()
627
628    def set_local_nic_info_helper(self):
629        return json.loads(self.exec_cmd(["lshw", "-json"]))
630
631    def stop(self):
632        self.ssh_connection.close()
633
634    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
635        if change_dir:
636            cmd = ["cd", change_dir, ";", *cmd]
637
638        # In case one of the command elements contains whitespace and is not
639        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
640        # when sending to remote system.
641        for i, c in enumerate(cmd):
642            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
643                cmd[i] = '"%s"' % c
644        cmd = " ".join(cmd)
645
646        # Redirect stderr to stdout thanks using get_pty option if needed
647        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
648        out = stdout.read().decode(encoding="utf-8")
649
650        # Check the return code
651        rc = stdout.channel.recv_exit_status()
652        if rc:
653            raise CalledProcessError(int(rc), cmd, out)
654
655        return out
656
657    def put_file(self, local, remote_dest):
658        ftp = self.ssh_connection.open_sftp()
659        ftp.put(local, remote_dest)
660        ftp.close()
661
662    def get_file(self, remote, local_dest):
663        ftp = self.ssh_connection.open_sftp()
664        ftp.get(remote, local_dest)
665        ftp.close()
666
667    def copy_spdk(self, local_spdk_zip):
668        self.log.info("Copying SPDK sources to initiator %s" % self.name)
669        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
670        self.log.info("Copied sources zip from target")
671        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
672        self.log.info("Sources unpacked")
673
674    def copy_result_files(self, dest_dir):
675        self.log.info("Copying results")
676
677        if not os.path.exists(dest_dir):
678            os.mkdir(dest_dir)
679
680        # Get list of result files from initiator and copy them back to target
681        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
682
683        for file in file_list:
684            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
685                          os.path.join(dest_dir, file))
686        self.log.info("Done copying results")
687
688    def match_subsystems(self, target_subsytems):
689        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
690        subsystems.sort(key=lambda x: x[1])
691        self.log.info("Found matching subsystems on target side:")
692        for s in subsystems:
693            self.log.info(s)
694        self.subsystem_info_list = subsystems
695
696    def gen_fio_filename_conf(self, *args, **kwargs):
697        # Logic implemented in SPDKInitiator and KernelInitiator classes
698        pass
699
700    def get_route_nic_numa(self, remote_nvme_ip):
701        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
702        local_nvme_nic = local_nvme_nic[0]["dev"]
703        return self.get_nic_numa_node(local_nvme_nic)
704
705    @staticmethod
706    def gen_fio_offset_section(offset_inc, num_jobs):
707        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
708        return "\n".join(["size=%s%%" % offset_inc,
709                          "offset=0%",
710                          "offset_increment=%s%%" % offset_inc])
711
712    def gen_fio_numa_section(self, fio_filenames_list):
713        numa_stats = {}
714        for nvme in fio_filenames_list:
715            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
716            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
717
718        # Use the most common NUMA node for this chunk to allocate memory and CPUs
719        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
720
721        return "\n".join(["numa_cpu_nodes=%s" % section_local_numa,
722                          "numa_mem_policy=prefer:%s" % section_local_numa])
723
724    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
725                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
726                       offset=False, offset_inc=0):
727        fio_conf_template = """
728[global]
729ioengine={ioengine}
730{spdk_conf}
731thread=1
732group_reporting=1
733direct=1
734percentile_list=50:90:99:99.5:99.9:99.99:99.999
735
736norandommap=1
737rw={rw}
738rwmixread={rwmixread}
739bs={block_size}
740time_based=1
741ramp_time={ramp_time}
742runtime={run_time}
743rate_iops={rate_iops}
744"""
745        if "spdk" in self.mode:
746            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
747            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
748        else:
749            ioengine = self.ioengine
750            spdk_conf = ""
751            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
752                                 "|", "awk", "'{print $1}'"])
753            subsystems = [x for x in out.split("\n") if "nvme" in x]
754
755        if self.cpus_allowed is not None:
756            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
757            cpus_num = 0
758            cpus = self.cpus_allowed.split(",")
759            for cpu in cpus:
760                if "-" in cpu:
761                    a, b = cpu.split("-")
762                    a = int(a)
763                    b = int(b)
764                    cpus_num += len(range(a, b))
765                else:
766                    cpus_num += 1
767            self.num_cores = cpus_num
768            threads = range(0, self.num_cores)
769        elif hasattr(self, 'num_cores'):
770            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
771            threads = range(0, int(self.num_cores))
772        else:
773            self.num_cores = len(subsystems)
774            threads = range(0, len(subsystems))
775
776        if "spdk" in self.mode:
777            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
778                                                          offset, offset_inc)
779        else:
780            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs,
781                                                          offset, offset_inc)
782
783        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
784                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
785                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
786
787        # TODO: hipri disabled for now, as it causes fio errors:
788        # io_u error on file /dev/nvme2n1: Operation not supported
789        # See comment in KernelInitiator class, init_connect() function
790        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
791            fio_config = fio_config + """
792fixedbufs=1
793registerfiles=1
794#hipri=1
795"""
796        if num_jobs:
797            fio_config = fio_config + "numjobs=%s \n" % num_jobs
798        if self.cpus_allowed is not None:
799            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
800            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
801        fio_config = fio_config + filename_section
802
803        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
804        if hasattr(self, "num_cores"):
805            fio_config_filename += "_%sCPU" % self.num_cores
806        fio_config_filename += ".fio"
807
808        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
809        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
810        self.log.info("Created FIO Config:")
811        self.log.info(fio_config)
812
813        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
814
815    def set_cpu_frequency(self):
816        if self.cpu_frequency is not None:
817            try:
818                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
819                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
820                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
821            except Exception:
822                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
823                sys.exit()
824        else:
825            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
826
827    def run_fio(self, fio_config_file, run_num=None):
828        job_name, _ = os.path.splitext(fio_config_file)
829        self.log.info("Starting FIO run for job: %s" % job_name)
830        self.log.info("Using FIO: %s" % self.fio_bin)
831
832        if run_num:
833            for i in range(1, run_num + 1):
834                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
835                try:
836                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
837                                            "--output=%s" % output_filename, "--eta=never"], True)
838                    self.log.info(output)
839                except CalledProcessError as e:
840                    self.log.error("ERROR: Fio process failed!")
841                    self.log.info(e.stdout)
842        else:
843            output_filename = job_name + "_" + self.name + ".json"
844            output = self.exec_cmd(["sudo", self.fio_bin,
845                                    fio_config_file, "--output-format=json",
846                                    "--output" % output_filename], True)
847            self.log.info(output)
848        self.log.info("FIO run finished. Results in: %s" % output_filename)
849
850    def sys_config(self):
851        self.log.info("====Kernel release:====")
852        self.log.info(self.exec_cmd(["uname", "-r"]))
853        self.log.info("====Kernel command line:====")
854        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
855        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
856        self.log.info("====sysctl conf:====")
857        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
858        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
859        self.log.info("====Cpu power info:====")
860        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
861
862
863class KernelTarget(Target):
864    def __init__(self, name, general_config, target_config):
865        super().__init__(name, general_config, target_config)
866        # Defaults
867        self.nvmet_bin = "nvmetcli"
868
869        if "nvmet_bin" in target_config:
870            self.nvmet_bin = target_config["nvmet_bin"]
871
872    def stop(self):
873        self.nvmet_command(self.nvmet_bin, "clear")
874
875    def get_nvme_devices(self):
876        output = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"])
877        output = [x for x in output.split("\n") if "nvme" in x]
878        return output
879
880    def nvmet_command(self, nvmet_bin, command):
881        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
882
883    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
884
885        nvmet_cfg = {
886            "ports": [],
887            "hosts": [],
888            "subsystems": [],
889        }
890
891        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
892            port = str(4420 + bdev_num)
893            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
894            serial = "SPDK00%s" % bdev_num
895            bdev_name = nvme_list[bdev_num]
896
897            nvmet_cfg["subsystems"].append({
898                "allowed_hosts": [],
899                "attr": {
900                    "allow_any_host": "1",
901                    "serial": serial,
902                    "version": "1.3"
903                },
904                "namespaces": [
905                    {
906                        "device": {
907                            "path": bdev_name,
908                            "uuid": "%s" % uuid.uuid4()
909                        },
910                        "enable": 1,
911                        "nsid": port
912                    }
913                ],
914                "nqn": nqn
915            })
916
917            nvmet_cfg["ports"].append({
918                "addr": {
919                    "adrfam": "ipv4",
920                    "traddr": ip,
921                    "trsvcid": port,
922                    "trtype": self.transport
923                },
924                "portid": bdev_num,
925                "referrals": [],
926                "subsystems": [nqn]
927            })
928
929            self.subsystem_info_list.append((port, nqn, ip))
930        self.subsys_no = len(self.subsystem_info_list)
931
932        with open("kernel.conf", "w") as fh:
933            fh.write(json.dumps(nvmet_cfg, indent=2))
934
935    def tgt_start(self):
936        self.log.info("Configuring kernel NVMeOF Target")
937
938        if self.null_block:
939            self.log.info("Configuring with null block device.")
940            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
941        else:
942            self.log.info("Configuring with NVMe drives.")
943            nvme_list = self.get_nvme_devices()
944
945        self.kernel_tgt_gen_subsystem_conf(nvme_list)
946        self.subsys_no = len(nvme_list)
947
948        self.nvmet_command(self.nvmet_bin, "clear")
949        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
950
951        if self.enable_adq:
952            self.adq_configure_tc()
953
954        self.log.info("Done configuring kernel NVMeOF Target")
955
956
957class SPDKTarget(Target):
958    def __init__(self, name, general_config, target_config):
959        super().__init__(name, general_config, target_config)
960
961        # Required fields
962        self.core_mask = target_config["core_mask"]
963        self.num_cores = self.get_num_cores(self.core_mask)
964
965        # Defaults
966        self.dif_insert_strip = False
967        self.null_block_dif_type = 0
968        self.num_shared_buffers = 4096
969        self.max_queue_depth = 128
970        self.bpf_proc = None
971        self.bpf_scripts = []
972        self.enable_dsa = False
973        self.scheduler_core_limit = None
974
975        if "num_shared_buffers" in target_config:
976            self.num_shared_buffers = target_config["num_shared_buffers"]
977        if "max_queue_depth" in target_config:
978            self.max_queue_depth = target_config["max_queue_depth"]
979        if "null_block_dif_type" in target_config:
980            self.null_block_dif_type = target_config["null_block_dif_type"]
981        if "dif_insert_strip" in target_config:
982            self.dif_insert_strip = target_config["dif_insert_strip"]
983        if "bpf_scripts" in target_config:
984            self.bpf_scripts = target_config["bpf_scripts"]
985        if "dsa_settings" in target_config:
986            self.enable_dsa = target_config["dsa_settings"]
987        if "scheduler_core_limit" in target_config:
988            self.scheduler_core_limit = target_config["scheduler_core_limit"]
989
990        self.log.info("====DSA settings:====")
991        self.log.info("DSA enabled: %s" % (self.enable_dsa))
992
993    def get_nvme_devices_count(self):
994        return len(self.get_nvme_devices())
995
996    def get_nvme_devices(self):
997        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
998        bdev_bdfs = [bdev["params"]["traddr"] for bdev in bdev_subsys_json_obj["config"]]
999        return bdev_bdfs
1000
1001    @staticmethod
1002    def get_num_cores(core_mask):
1003        if "0x" in core_mask:
1004            return bin(int(core_mask, 16)).count("1")
1005        else:
1006            num_cores = 0
1007            core_mask = core_mask.replace("[", "")
1008            core_mask = core_mask.replace("]", "")
1009            for i in core_mask.split(","):
1010                if "-" in i:
1011                    x, y = i.split("-")
1012                    num_cores += len(range(int(x), int(y))) + 1
1013                else:
1014                    num_cores += 1
1015            return num_cores
1016
1017    def spdk_tgt_configure(self):
1018        self.log.info("Configuring SPDK NVMeOF target via RPC")
1019
1020        if self.enable_adq:
1021            self.adq_configure_tc()
1022
1023        # Create transport layer
1024        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1025                                       num_shared_buffers=self.num_shared_buffers,
1026                                       max_queue_depth=self.max_queue_depth,
1027                                       dif_insert_or_strip=self.dif_insert_strip,
1028                                       sock_priority=self.adq_priority)
1029        self.log.info("SPDK NVMeOF transport layer:")
1030        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1031
1032        if self.null_block:
1033            self.spdk_tgt_add_nullblock(self.null_block)
1034            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1035        else:
1036            self.spdk_tgt_add_nvme_conf()
1037            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1038
1039        self.log.info("Done configuring SPDK NVMeOF Target")
1040
1041    def spdk_tgt_add_nullblock(self, null_block_count):
1042        md_size = 0
1043        block_size = 4096
1044        if self.null_block_dif_type != 0:
1045            md_size = 128
1046
1047        self.log.info("Adding null block bdevices to config via RPC")
1048        for i in range(null_block_count):
1049            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1050            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1051                                      dif_type=self.null_block_dif_type, md_size=md_size)
1052        self.log.info("SPDK Bdevs configuration:")
1053        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1054
1055    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1056        self.log.info("Adding NVMe bdevs to config via RPC")
1057
1058        bdfs = self.get_nvme_devices()
1059        bdfs = [b.replace(":", ".") for b in bdfs]
1060
1061        if req_num_disks:
1062            if req_num_disks > len(bdfs):
1063                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1064                sys.exit(1)
1065            else:
1066                bdfs = bdfs[0:req_num_disks]
1067
1068        for i, bdf in enumerate(bdfs):
1069            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1070
1071        self.log.info("SPDK Bdevs configuration:")
1072        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1073
1074    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1075        self.log.info("Adding subsystems to config")
1076        if not req_num_disks:
1077            req_num_disks = self.get_nvme_devices_count()
1078
1079        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1080            port = str(4420 + bdev_num)
1081            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1082            serial = "SPDK00%s" % bdev_num
1083            bdev_name = "Nvme%sn1" % bdev_num
1084
1085            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1086                                           allow_any_host=True, max_namespaces=8)
1087            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1088            for nqn_name in [nqn, "discovery"]:
1089                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1090                                                     nqn=nqn_name,
1091                                                     trtype=self.transport,
1092                                                     traddr=ip,
1093                                                     trsvcid=port,
1094                                                     adrfam="ipv4")
1095            self.subsystem_info_list.append((port, nqn, ip))
1096        self.subsys_no = len(self.subsystem_info_list)
1097
1098        self.log.info("SPDK NVMeOF subsystem configuration:")
1099        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1100
1101    def bpf_start(self):
1102        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1103        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1104        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1105        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1106
1107        with open(self.pid, "r") as fh:
1108            nvmf_pid = str(fh.readline())
1109
1110        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1111        self.log.info(cmd)
1112        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1113
1114    def tgt_start(self):
1115        if self.null_block:
1116            self.subsys_no = 1
1117        else:
1118            self.subsys_no = self.get_nvme_devices_count()
1119        self.log.info("Starting SPDK NVMeOF Target process")
1120        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1121        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1122        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1123
1124        with open(self.pid, "w") as fh:
1125            fh.write(str(proc.pid))
1126        self.nvmf_proc = proc
1127        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1128        self.log.info("Waiting for spdk to initialize...")
1129        while True:
1130            if os.path.exists("/var/tmp/spdk.sock"):
1131                break
1132            time.sleep(1)
1133        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1134
1135        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1136
1137        if self.enable_zcopy:
1138            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1139                                           enable_zerocopy_send_server=True)
1140            self.log.info("Target socket options:")
1141            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1142
1143        if self.enable_adq:
1144            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1145            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1146                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1147
1148        if self.enable_dsa:
1149            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1150            self.log.info("Target DSA accel module enabled")
1151
1152        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1153        rpc.framework_start_init(self.client)
1154
1155        if self.bpf_scripts:
1156            self.bpf_start()
1157
1158        self.spdk_tgt_configure()
1159
1160    def stop(self):
1161        if self.bpf_proc:
1162            self.log.info("Stopping BPF Trace script")
1163            self.bpf_proc.terminate()
1164            self.bpf_proc.wait()
1165
1166        if hasattr(self, "nvmf_proc"):
1167            try:
1168                self.nvmf_proc.terminate()
1169                self.nvmf_proc.wait(timeout=30)
1170            except Exception as e:
1171                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1172                self.log.info(e)
1173                self.nvmf_proc.kill()
1174                self.nvmf_proc.communicate()
1175                # Try to clean up RPC socket files if they were not removed
1176                # because of using 'kill'
1177                try:
1178                    os.remove("/var/tmp/spdk.sock")
1179                    os.remove("/var/tmp/spdk.sock.lock")
1180                except FileNotFoundError:
1181                    pass
1182
1183
1184class KernelInitiator(Initiator):
1185    def __init__(self, name, general_config, initiator_config):
1186        super().__init__(name, general_config, initiator_config)
1187
1188        # Defaults
1189        self.extra_params = ""
1190        self.ioengine = "libaio"
1191
1192        if "extra_params" in initiator_config:
1193            self.extra_params = initiator_config["extra_params"]
1194
1195        if "kernel_engine" in initiator_config:
1196            self.ioengine = initiator_config["kernel_engine"]
1197            if "io_uring" in self.ioengine:
1198                self.extra_params = "--nr-poll-queues=8"
1199
1200    def get_connected_nvme_list(self):
1201        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1202        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1203                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1204        return nvme_list
1205
1206    def init_connect(self):
1207        self.log.info("Below connection attempts may result in error messages, this is expected!")
1208        for subsystem in self.subsystem_info_list:
1209            self.log.info("Trying to connect %s %s %s" % subsystem)
1210            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1211                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1212            time.sleep(2)
1213
1214        if "io_uring" in self.ioengine:
1215            self.log.info("Setting block layer settings for io_uring.")
1216
1217            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1218            #       apparently it's not possible for connected subsystems.
1219            #       Results in "error: Invalid argument"
1220            block_sysfs_settings = {
1221                "iostats": "0",
1222                "rq_affinity": "0",
1223                "nomerges": "2"
1224            }
1225
1226            for disk in self.get_connected_nvme_list():
1227                sysfs = os.path.join("/sys/block", disk, "queue")
1228                for k, v in block_sysfs_settings.items():
1229                    sysfs_opt_path = os.path.join(sysfs, k)
1230                    try:
1231                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1232                    except CalledProcessError as e:
1233                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1234                    finally:
1235                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1236                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1237
1238    def init_disconnect(self):
1239        for subsystem in self.subsystem_info_list:
1240            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1241            time.sleep(1)
1242
1243    def get_nvme_subsystem_numa(self, dev_name):
1244        # Remove two last characters to get controller name instead of subsystem name
1245        nvme_ctrl = os.path.basename(dev_name)[:-2]
1246        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1247                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1248        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1249
1250    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1251        # Generate connected nvme devices names and sort them by used NIC numa node
1252        # to allow better grouping when splitting into fio sections.
1253        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1254        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1255        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1256
1257        filename_section = ""
1258        nvme_per_split = int(len(nvme_list) / len(threads))
1259        remainder = len(nvme_list) % len(threads)
1260        iterator = iter(nvme_list)
1261        result = []
1262        for i in range(len(threads)):
1263            result.append([])
1264            for _ in range(nvme_per_split):
1265                result[i].append(next(iterator))
1266                if remainder:
1267                    result[i].append(next(iterator))
1268                    remainder -= 1
1269        for i, r in enumerate(result):
1270            header = "[filename%s]" % i
1271            disks = "\n".join(["filename=%s" % x for x in r])
1272            job_section_qd = round((io_depth * len(r)) / num_jobs)
1273            if job_section_qd == 0:
1274                job_section_qd = 1
1275            iodepth = "iodepth=%s" % job_section_qd
1276
1277            offset_section = ""
1278            if offset:
1279                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1280
1281            numa_opts = self.gen_fio_numa_section(r)
1282
1283            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1284
1285        return filename_section
1286
1287
1288class SPDKInitiator(Initiator):
1289    def __init__(self, name, general_config, initiator_config):
1290        super().__init__(name, general_config, initiator_config)
1291
1292        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1293            self.install_spdk()
1294
1295        # Required fields
1296        self.num_cores = initiator_config["num_cores"]
1297
1298        # Optional fields
1299        self.enable_data_digest = False
1300        if "enable_data_digest" in initiator_config:
1301            self.enable_data_digest = initiator_config["enable_data_digest"]
1302
1303    def install_spdk(self):
1304        self.log.info("Using fio binary %s" % self.fio_bin)
1305        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1306        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1307        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1308        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1309        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1310
1311        self.log.info("SPDK built")
1312        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1313
1314    def init_connect(self):
1315        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1316        # bdev plugin initiates connection just before starting IO traffic.
1317        # This is just to have a "init_connect" equivalent of the same function
1318        # from KernelInitiator class.
1319        # Just prepare bdev.conf JSON file for later use and consider it
1320        # "making a connection".
1321        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1322        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1323
1324    def init_disconnect(self):
1325        # SPDK Initiator does not need to explicity disconnect as this gets done
1326        # after fio bdev plugin finishes IO.
1327        pass
1328
1329    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1330        bdev_cfg_section = {
1331            "subsystems": [
1332                {
1333                    "subsystem": "bdev",
1334                    "config": []
1335                }
1336            ]
1337        }
1338
1339        for i, subsys in enumerate(remote_subsystem_list):
1340            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1341            nvme_ctrl = {
1342                "method": "bdev_nvme_attach_controller",
1343                "params": {
1344                    "name": "Nvme{}".format(i),
1345                    "trtype": self.transport,
1346                    "traddr": sub_addr,
1347                    "trsvcid": sub_port,
1348                    "subnqn": sub_nqn,
1349                    "adrfam": "IPv4"
1350                }
1351            }
1352
1353            if self.enable_adq:
1354                nvme_ctrl["params"].update({"priority": "1"})
1355
1356            if self.enable_data_digest:
1357                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1358
1359            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1360
1361        return json.dumps(bdev_cfg_section, indent=2)
1362
1363    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1364        filename_section = ""
1365        if len(threads) >= len(subsystems):
1366            threads = range(0, len(subsystems))
1367
1368        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1369        # to allow better grouping when splitting into fio sections.
1370        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1371        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1372        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1373
1374        nvme_per_split = int(len(subsystems) / len(threads))
1375        remainder = len(subsystems) % len(threads)
1376        iterator = iter(filenames)
1377        result = []
1378        for i in range(len(threads)):
1379            result.append([])
1380            for _ in range(nvme_per_split):
1381                result[i].append(next(iterator))
1382            if remainder:
1383                result[i].append(next(iterator))
1384                remainder -= 1
1385        for i, r in enumerate(result):
1386            header = "[filename%s]" % i
1387            disks = "\n".join(["filename=%s" % x for x in r])
1388            job_section_qd = round((io_depth * len(r)) / num_jobs)
1389            if job_section_qd == 0:
1390                job_section_qd = 1
1391            iodepth = "iodepth=%s" % job_section_qd
1392
1393            offset_section = ""
1394            if offset:
1395                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1396
1397            numa_opts = self.gen_fio_numa_section(r)
1398
1399            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1400
1401        return filename_section
1402
1403    def get_nvme_subsystem_numa(self, bdev_name):
1404        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1405        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1406
1407        # Remove two last characters to get controller name instead of subsystem name
1408        nvme_ctrl = bdev_name[:-2]
1409        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1410        return self.get_route_nic_numa(remote_nvme_ip)
1411
1412
1413if __name__ == "__main__":
1414    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1415    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1416
1417    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1418    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1419                        help='Configuration file.')
1420    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1421                        help='Results directory.')
1422    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1423                        help='CSV results filename.')
1424
1425    args = parser.parse_args()
1426
1427    logging.basicConfig(level=logging.INFO,
1428                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1429
1430    logging.info("Using config file: %s" % args.config)
1431    with open(args.config, "r") as config:
1432        data = json.load(config)
1433
1434    initiators = []
1435    fio_cases = []
1436
1437    general_config = data["general"]
1438    target_config = data["target"]
1439    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1440
1441    for k, v in data.items():
1442        if "target" in k:
1443            v.update({"results_dir": args.results})
1444            if data[k]["mode"] == "spdk":
1445                target_obj = SPDKTarget(k, data["general"], v)
1446            elif data[k]["mode"] == "kernel":
1447                target_obj = KernelTarget(k, data["general"], v)
1448        elif "initiator" in k:
1449            if data[k]["mode"] == "spdk":
1450                init_obj = SPDKInitiator(k, data["general"], v)
1451            elif data[k]["mode"] == "kernel":
1452                init_obj = KernelInitiator(k, data["general"], v)
1453            initiators.append(init_obj)
1454        elif "fio" in k:
1455            fio_workloads = itertools.product(data[k]["bs"],
1456                                              data[k]["qd"],
1457                                              data[k]["rw"])
1458
1459            fio_run_time = data[k]["run_time"]
1460            fio_ramp_time = data[k]["ramp_time"]
1461            fio_rw_mix_read = data[k]["rwmixread"]
1462            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1463            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1464
1465            fio_rate_iops = 0
1466            if "rate_iops" in data[k]:
1467                fio_rate_iops = data[k]["rate_iops"]
1468
1469            fio_offset = False
1470            if "offset" in data[k]:
1471                fio_offset = data[k]["offset"]
1472            fio_offset_inc = 0
1473            if "offset_inc" in data[k]:
1474                fio_offset_inc = data[k]["offset_inc"]
1475        else:
1476            continue
1477
1478    try:
1479        os.mkdir(args.results)
1480    except FileExistsError:
1481        pass
1482
1483    for i in initiators:
1484        target_obj.initiator_info.append(
1485            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1486        )
1487
1488    # TODO: This try block is definietly too large. Need to break this up into separate
1489    # logical blocks to reduce size.
1490    try:
1491        target_obj.tgt_start()
1492
1493        for i in initiators:
1494            i.match_subsystems(target_obj.subsystem_info_list)
1495            if i.enable_adq:
1496                i.adq_configure_tc()
1497
1498        # Poor mans threading
1499        # Run FIO tests
1500        for block_size, io_depth, rw in fio_workloads:
1501            threads = []
1502            configs = []
1503            power_daemon = None
1504            for i in initiators:
1505                i.init_connect()
1506                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1507                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1508                                       fio_offset, fio_offset_inc)
1509                configs.append(cfg)
1510
1511            for i, cfg in zip(initiators, configs):
1512                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1513                threads.append(t)
1514            if target_obj.enable_sar:
1515                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1516                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1517                threads.append(t)
1518
1519            if target_obj.enable_pcm:
1520                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1521
1522                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1523                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1524                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1525
1526                threads.append(pcm_cpu_t)
1527                threads.append(pcm_mem_t)
1528                threads.append(pcm_pow_t)
1529
1530            if target_obj.enable_bandwidth:
1531                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1532                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1533                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1534                threads.append(t)
1535
1536            if target_obj.enable_dpdk_memory:
1537                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1538                threads.append(t)
1539
1540            if target_obj.enable_adq:
1541                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1542                threads.append(ethtool_thread)
1543
1544            if target_obj.enable_pm:
1545                power_daemon = threading.Thread(target=target_obj.measure_power,
1546                                                args=(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir))
1547                threads.append(power_daemon)
1548
1549            for t in threads:
1550                t.start()
1551            for t in threads:
1552                t.join()
1553
1554            for i in initiators:
1555                i.init_disconnect()
1556                i.copy_result_files(args.results)
1557
1558        target_obj.restore_governor()
1559        target_obj.restore_tuned()
1560        target_obj.restore_services()
1561        target_obj.restore_sysctl()
1562        if target_obj.enable_adq:
1563            target_obj.reload_driver("ice")
1564        for i in initiators:
1565            i.restore_governor()
1566            i.restore_tuned()
1567            i.restore_services()
1568            i.restore_sysctl()
1569            if i.enable_adq:
1570                i.reload_driver("ice")
1571        parse_results(args.results, args.csv_filename)
1572    finally:
1573        for i in initiators:
1574            try:
1575                i.stop()
1576            except Exception as err:
1577                pass
1578        target_obj.stop()
1579