xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 0cae873b78434b905669bf3e373b421c2b6f18f9)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.nic_ips = server_config["nic_ips"]
39        self.mode = server_config["mode"]
40
41        self.log = logging.getLogger(self.name)
42
43        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
44        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
45            self.irq_scripts_dir = server_config["irq_scripts_dir"]
46
47        self.local_nic_info = []
48        self._nics_json_obj = {}
49        self.svc_restore_dict = {}
50        self.sysctl_restore_dict = {}
51        self.tuned_restore_dict = {}
52        self.governor_restore = ""
53        self.tuned_profile = ""
54
55        self.enable_adq = False
56        self.adq_priority = None
57        if "adq_enable" in server_config and server_config["adq_enable"]:
58            self.enable_adq = server_config["adq_enable"]
59            self.adq_priority = 1
60
61        if "tuned_profile" in server_config:
62            self.tuned_profile = server_config["tuned_profile"]
63
64        if not re.match("^[A-Za-z0-9]*$", name):
65            self.log.info("Please use a name which contains only letters or numbers")
66            sys.exit(1)
67
68    @staticmethod
69    def get_uncommented_lines(lines):
70        return [line for line in lines if line and not line.startswith('#')]
71
72    def get_nic_name_by_ip(self, ip):
73        if not self._nics_json_obj:
74            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
75            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
76        for nic in self._nics_json_obj:
77            for addr in nic["addr_info"]:
78                if ip in addr["local"]:
79                    return nic["ifname"]
80
81    def set_local_nic_info_helper(self):
82        pass
83
84    def set_local_nic_info(self, pci_info):
85        def extract_network_elements(json_obj):
86            nic_list = []
87            if isinstance(json_obj, list):
88                for x in json_obj:
89                    nic_list.extend(extract_network_elements(x))
90            elif isinstance(json_obj, dict):
91                if "children" in json_obj:
92                    nic_list.extend(extract_network_elements(json_obj["children"]))
93                if "class" in json_obj.keys() and "network" in json_obj["class"]:
94                    nic_list.append(json_obj)
95            return nic_list
96
97        self.local_nic_info = extract_network_elements(pci_info)
98
99    def get_nic_numa_node(self, nic_name):
100        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
101
102    def get_numa_cpu_map(self):
103        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
104        numa_cpu_json_map = {}
105
106        for cpu in numa_cpu_json_obj["cpus"]:
107            cpu_num = int(cpu["cpu"])
108            numa_node = int(cpu["node"])
109            numa_cpu_json_map.setdefault(numa_node, [])
110            numa_cpu_json_map[numa_node].append(cpu_num)
111
112        return numa_cpu_json_map
113
114    # pylint: disable=R0201
115    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
116        return ""
117
118    def configure_system(self):
119        self.load_drivers()
120        self.configure_services()
121        self.configure_sysctl()
122        self.configure_tuned()
123        self.configure_cpu_governor()
124        self.configure_irq_affinity()
125
126    def load_drivers(self):
127        self.log.info("Loading drivers")
128        self.exec_cmd(["sudo", "modprobe", "-a",
129                       "nvme-%s" % self.transport,
130                       "nvmet-%s" % self.transport])
131        if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block:
132            self.exec_cmd(["sudo", "modprobe", "null_blk",
133                           "nr_devices=%s" % self.null_block])
134
135    def configure_adq(self):
136        if self.mode == "kernel":
137            self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
138            return
139        self.adq_load_modules()
140        self.adq_configure_nic()
141
142    def adq_load_modules(self):
143        self.log.info("Modprobing ADQ-related Linux modules...")
144        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
145        for module in adq_module_deps:
146            try:
147                self.exec_cmd(["sudo", "modprobe", module])
148                self.log.info("%s loaded!" % module)
149            except CalledProcessError as e:
150                self.log.error("ERROR: failed to load module %s" % module)
151                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
152
153    def adq_configure_tc(self):
154        self.log.info("Configuring ADQ Traffic classes and filters...")
155
156        if self.mode == "kernel":
157            self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
158            return
159
160        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
161        num_queues_tc1 = self.num_cores
162        port_param = "dst_port" if isinstance(self, Target) else "src_port"
163        port = "4420"
164        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
165
166        for nic_ip in self.nic_ips:
167            nic_name = self.get_nic_name_by_ip(nic_ip)
168            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
169                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
170                                "queues", "%s@0" % num_queues_tc0,
171                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
172                                "hw", "1", "mode", "channel"]
173            self.log.info(" ".join(tc_qdisc_map_cmd))
174            self.exec_cmd(tc_qdisc_map_cmd)
175
176            time.sleep(5)
177            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
178            self.log.info(" ".join(tc_qdisc_ingress_cmd))
179            self.exec_cmd(tc_qdisc_ingress_cmd)
180
181            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
182                             "protocol", "ip", "ingress", "prio", "1", "flower",
183                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
184                             "skip_sw", "hw_tc", "1"]
185            self.log.info(" ".join(tc_filter_cmd))
186            self.exec_cmd(tc_filter_cmd)
187
188            # show tc configuration
189            self.log.info("Show tc configuration for %s NIC..." % nic_name)
190            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
191            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
192            self.log.info("%s" % tc_disk_out)
193            self.log.info("%s" % tc_filter_out)
194
195            # Ethtool coalesce settings must be applied after configuring traffic classes
196            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
197            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
198
199            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
200            xps_cmd = ["sudo", xps_script_path, nic_name]
201            self.log.info(xps_cmd)
202            self.exec_cmd(xps_cmd)
203
204    def reload_driver(self, driver):
205        try:
206            self.exec_cmd(["sudo", "rmmod", driver])
207            self.exec_cmd(["sudo", "modprobe", driver])
208        except CalledProcessError as e:
209            self.log.error("ERROR: failed to reload %s module!" % driver)
210            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
211
212    def adq_configure_nic(self):
213        self.log.info("Configuring NIC port settings for ADQ testing...")
214
215        # Reload the driver first, to make sure any previous settings are re-set.
216        self.reload_driver("ice")
217
218        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
219        for nic in nic_names:
220            self.log.info(nic)
221            try:
222                self.exec_cmd(["sudo", "ethtool", "-K", nic,
223                               "hw-tc-offload", "on"])  # Enable hardware TC offload
224                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
225                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
226                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
227                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
228                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
229                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
230                               "channel-pkt-inspect-optimize", "on"])
231            except CalledProcessError as e:
232                self.log.error("ERROR: failed to configure NIC port using ethtool!")
233                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
234                self.log.info("Please update your NIC driver and firmware versions and try again.")
235            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
236            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
237
238    def configure_services(self):
239        self.log.info("Configuring active services...")
240        svc_config = configparser.ConfigParser(strict=False)
241
242        # Below list is valid only for RHEL / Fedora systems and might not
243        # contain valid names for other distributions.
244        svc_target_state = {
245            "firewalld": "inactive",
246            "irqbalance": "inactive",
247            "lldpad.service": "inactive",
248            "lldpad.socket": "inactive"
249        }
250
251        for service in svc_target_state:
252            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
253            out = "\n".join(["[%s]" % service, out])
254            svc_config.read_string(out)
255
256            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
257                continue
258
259            service_state = svc_config[service]["ActiveState"]
260            self.log.info("Current state of %s service is %s" % (service, service_state))
261            self.svc_restore_dict.update({service: service_state})
262            if service_state != "inactive":
263                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
264                self.exec_cmd(["sudo", "systemctl", "stop", service])
265
266    def configure_sysctl(self):
267        self.log.info("Tuning sysctl settings...")
268
269        busy_read = 0
270        if self.enable_adq and self.mode == "spdk":
271            busy_read = 1
272
273        sysctl_opts = {
274            "net.core.busy_poll": 0,
275            "net.core.busy_read": busy_read,
276            "net.core.somaxconn": 4096,
277            "net.core.netdev_max_backlog": 8192,
278            "net.ipv4.tcp_max_syn_backlog": 16384,
279            "net.core.rmem_max": 268435456,
280            "net.core.wmem_max": 268435456,
281            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
282            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
283            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
284            "net.ipv4.route.flush": 1,
285            "vm.overcommit_memory": 1,
286        }
287
288        for opt, value in sysctl_opts.items():
289            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
290            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
291
292    def configure_tuned(self):
293        if not self.tuned_profile:
294            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
295            return
296
297        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
298        service = "tuned"
299        tuned_config = configparser.ConfigParser(strict=False)
300
301        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
302        out = "\n".join(["[%s]" % service, out])
303        tuned_config.read_string(out)
304        tuned_state = tuned_config[service]["ActiveState"]
305        self.svc_restore_dict.update({service: tuned_state})
306
307        if tuned_state != "inactive":
308            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
309            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
310
311            self.tuned_restore_dict = {
312                "profile": profile,
313                "mode": profile_mode
314            }
315
316        self.exec_cmd(["sudo", "systemctl", "start", service])
317        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
318        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
319
320    def configure_cpu_governor(self):
321        self.log.info("Setting CPU governor to performance...")
322
323        # This assumes that there is the same CPU scaling governor on each CPU
324        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
325        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
326
327    def configure_irq_affinity(self):
328        self.log.info("Setting NIC irq affinity for NICs...")
329
330        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
331        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
332        for nic in nic_names:
333            irq_cmd = ["sudo", irq_script_path, nic]
334            self.log.info(irq_cmd)
335            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
336
337    def restore_services(self):
338        self.log.info("Restoring services...")
339        for service, state in self.svc_restore_dict.items():
340            cmd = "stop" if state == "inactive" else "start"
341            self.exec_cmd(["sudo", "systemctl", cmd, service])
342
343    def restore_sysctl(self):
344        self.log.info("Restoring sysctl settings...")
345        for opt, value in self.sysctl_restore_dict.items():
346            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
347
348    def restore_tuned(self):
349        self.log.info("Restoring tuned-adm settings...")
350
351        if not self.tuned_restore_dict:
352            return
353
354        if self.tuned_restore_dict["mode"] == "auto":
355            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
356            self.log.info("Reverted tuned-adm to auto_profile.")
357        else:
358            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
359            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
360
361    def restore_governor(self):
362        self.log.info("Restoring CPU governor setting...")
363        if self.governor_restore:
364            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
365            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
366
367    def restore_settings(self):
368        self.restore_governor()
369        self.restore_tuned()
370        self.restore_services()
371        self.restore_sysctl()
372        if self.enable_adq:
373            self.reload_driver("ice")
374
375
376class Target(Server):
377    def __init__(self, name, general_config, target_config):
378        super().__init__(name, general_config, target_config)
379
380        # Defaults
381        self.enable_zcopy = False
382        self.scheduler_name = "static"
383        self.null_block = 0
384        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
385        self.subsystem_info_list = []
386        self.initiator_info = []
387        self.nvme_allowlist = []
388        self.nvme_blocklist = []
389
390        # Target-side measurement options
391        self.enable_pm = True
392        self.enable_sar = True
393        self.enable_pcm = True
394        self.enable_bw = True
395        self.enable_dpdk_memory = True
396
397        if "null_block_devices" in target_config:
398            self.null_block = target_config["null_block_devices"]
399        if "scheduler_settings" in target_config:
400            self.scheduler_name = target_config["scheduler_settings"]
401        if "zcopy_settings" in target_config:
402            self.enable_zcopy = target_config["zcopy_settings"]
403        if "results_dir" in target_config:
404            self.results_dir = target_config["results_dir"]
405        if "blocklist" in target_config:
406            self.nvme_blocklist = target_config["blocklist"]
407        if "allowlist" in target_config:
408            self.nvme_allowlist = target_config["allowlist"]
409            # Blocklist takes precedence, remove common elements from allowlist
410            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
411        if "enable_pm" in target_config:
412            self.enable_pm = target_config["enable_pm"]
413        if "enable_sar" in target_config:
414            self.enable_sar = target_config["sar_settings"]
415        if "enable_pcm" in target_config:
416            self.enable_pcm = target_config["enable_pcm"]
417        if "enable_bandwidth" in target_config:
418            self.enable_bw = target_config["enable_bandwidth"]
419        if "enable_dpdk_memory" in target_config:
420            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
421
422        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
423        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
424
425        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
426        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
427        self.set_local_nic_info(self.set_local_nic_info_helper())
428
429        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
430            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
431
432        self.configure_system()
433        if self.enable_adq:
434            self.configure_adq()
435        self.sys_config()
436
437    def set_local_nic_info_helper(self):
438        return json.loads(self.exec_cmd(["lshw", "-json"]))
439
440    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
441        stderr_opt = None
442        if stderr_redirect:
443            stderr_opt = subprocess.STDOUT
444        if change_dir:
445            old_cwd = os.getcwd()
446            os.chdir(change_dir)
447            self.log.info("Changing directory to %s" % change_dir)
448
449        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
450
451        if change_dir:
452            os.chdir(old_cwd)
453            self.log.info("Changing directory to %s" % old_cwd)
454        return out
455
456    def zip_spdk_sources(self, spdk_dir, dest_file):
457        self.log.info("Zipping SPDK source directory")
458        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
459        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
460            for file in files:
461                fh.write(os.path.relpath(os.path.join(root, file)))
462        fh.close()
463        self.log.info("Done zipping")
464
465    @staticmethod
466    def _chunks(input_list, chunks_no):
467        div, rem = divmod(len(input_list), chunks_no)
468        for i in range(chunks_no):
469            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
470            yield input_list[si:si + (div + 1 if i < rem else div)]
471
472    def spread_bdevs(self, req_disks):
473        # Spread available block devices indexes:
474        # - evenly across available initiator systems
475        # - evenly across available NIC interfaces for
476        #   each initiator
477        # Not NUMA aware.
478        ip_bdev_map = []
479        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
480
481        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
482            self.initiator_info[i]["bdev_range"] = init_chunk
483            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
484            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
485                for c in nic_chunk:
486                    ip_bdev_map.append((ip, c))
487        return ip_bdev_map
488
489    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
490        cpu_number = os.cpu_count()
491        sar_idle_sum = 0
492        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
493        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
494
495        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
496        time.sleep(ramp_time)
497        self.log.info("Starting SAR measurements")
498
499        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
500        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
501            for line in out.split("\n"):
502                if "Average" in line:
503                    if "CPU" in line:
504                        self.log.info("Summary CPU utilization from SAR:")
505                        self.log.info(line)
506                    elif "all" in line:
507                        self.log.info(line)
508                    else:
509                        sar_idle_sum += float(line.split()[7])
510            fh.write(out)
511        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
512
513        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
514            f.write("%0.2f" % sar_cpu_usage)
515
516    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
517        time.sleep(ramp_time)
518        self.log.info("Starting power measurements")
519        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
520                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
521                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
522
523    def ethtool_after_fio_ramp(self, fio_ramp_time):
524        time.sleep(fio_ramp_time//2)
525        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
526        for nic in nic_names:
527            self.log.info(nic)
528            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
529                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
530
531    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
532        time.sleep(ramp_time)
533        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
534        pcm_memory = subprocess.Popen(cmd)
535        time.sleep(run_time)
536        pcm_memory.terminate()
537
538    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
539        time.sleep(ramp_time)
540        cmd = ["pcm", "1", "-i=%s" % run_time,
541               "-csv=%s/%s" % (results_dir, pcm_file_name)]
542        subprocess.run(cmd)
543        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
544        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
545        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
546        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
547        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
548
549    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
550        time.sleep(ramp_time)
551        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
552        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
553            fh.write(out)
554        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
555        #       Improve this so that additional file containing measurements average is generated too.
556
557    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
558        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
559        time.sleep(ramp_time)
560        self.log.info("INFO: starting network bandwidth measure")
561        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
562                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
563        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
564        #       Improve this so that additional file containing measurements average is generated too.
565        # TODO: Monitor only these interfaces which are currently used to run the workload.
566
567    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
568        self.log.info("INFO: waiting to generate DPDK memory usage")
569        time.sleep(ramp_time)
570        self.log.info("INFO: generating DPDK memory usage")
571        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
572        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
573
574    def sys_config(self):
575        self.log.info("====Kernel release:====")
576        self.log.info(os.uname().release)
577        self.log.info("====Kernel command line:====")
578        with open('/proc/cmdline') as f:
579            cmdline = f.readlines()
580            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
581        self.log.info("====sysctl conf:====")
582        with open('/etc/sysctl.conf') as f:
583            sysctl = f.readlines()
584            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
585        self.log.info("====Cpu power info:====")
586        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
587        self.log.info("====zcopy settings:====")
588        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
589        self.log.info("====Scheduler settings:====")
590        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
591
592
593class Initiator(Server):
594    def __init__(self, name, general_config, initiator_config):
595        super().__init__(name, general_config, initiator_config)
596
597        # Required fields
598        self.ip = initiator_config["ip"]
599        self.target_nic_ips = initiator_config["target_nic_ips"]
600
601        # Defaults
602        self.cpus_allowed = None
603        self.cpus_allowed_policy = "shared"
604        self.spdk_dir = "/tmp/spdk"
605        self.fio_bin = "/usr/src/fio/fio"
606        self.nvmecli_bin = "nvme"
607        self.cpu_frequency = None
608        self.subsystem_info_list = []
609
610        if "spdk_dir" in initiator_config:
611            self.spdk_dir = initiator_config["spdk_dir"]
612        if "fio_bin" in initiator_config:
613            self.fio_bin = initiator_config["fio_bin"]
614        if "nvmecli_bin" in initiator_config:
615            self.nvmecli_bin = initiator_config["nvmecli_bin"]
616        if "cpus_allowed" in initiator_config:
617            self.cpus_allowed = initiator_config["cpus_allowed"]
618        if "cpus_allowed_policy" in initiator_config:
619            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
620        if "cpu_frequency" in initiator_config:
621            self.cpu_frequency = initiator_config["cpu_frequency"]
622        if os.getenv('SPDK_WORKSPACE'):
623            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
624
625        self.ssh_connection = paramiko.SSHClient()
626        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
627        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
628        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
629        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
630        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
631
632        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
633            self.copy_spdk("/tmp/spdk.zip")
634        self.set_local_nic_info(self.set_local_nic_info_helper())
635        self.set_cpu_frequency()
636        self.configure_system()
637        if self.enable_adq:
638            self.configure_adq()
639        self.sys_config()
640
641    def set_local_nic_info_helper(self):
642        return json.loads(self.exec_cmd(["lshw", "-json"]))
643
644    def stop(self):
645        self.restore_settings()
646        self.ssh_connection.close()
647
648    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
649        if change_dir:
650            cmd = ["cd", change_dir, ";", *cmd]
651
652        # In case one of the command elements contains whitespace and is not
653        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
654        # when sending to remote system.
655        for i, c in enumerate(cmd):
656            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
657                cmd[i] = '"%s"' % c
658        cmd = " ".join(cmd)
659
660        # Redirect stderr to stdout thanks using get_pty option if needed
661        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
662        out = stdout.read().decode(encoding="utf-8")
663
664        # Check the return code
665        rc = stdout.channel.recv_exit_status()
666        if rc:
667            raise CalledProcessError(int(rc), cmd, out)
668
669        return out
670
671    def put_file(self, local, remote_dest):
672        ftp = self.ssh_connection.open_sftp()
673        ftp.put(local, remote_dest)
674        ftp.close()
675
676    def get_file(self, remote, local_dest):
677        ftp = self.ssh_connection.open_sftp()
678        ftp.get(remote, local_dest)
679        ftp.close()
680
681    def copy_spdk(self, local_spdk_zip):
682        self.log.info("Copying SPDK sources to initiator %s" % self.name)
683        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
684        self.log.info("Copied sources zip from target")
685        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
686        self.log.info("Sources unpacked")
687
688    def copy_result_files(self, dest_dir):
689        self.log.info("Copying results")
690
691        if not os.path.exists(dest_dir):
692            os.mkdir(dest_dir)
693
694        # Get list of result files from initiator and copy them back to target
695        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
696
697        for file in file_list:
698            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
699                          os.path.join(dest_dir, file))
700        self.log.info("Done copying results")
701
702    def match_subsystems(self, target_subsytems):
703        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
704        subsystems.sort(key=lambda x: x[1])
705        self.log.info("Found matching subsystems on target side:")
706        for s in subsystems:
707            self.log.info(s)
708        self.subsystem_info_list = subsystems
709
710    def gen_fio_filename_conf(self, *args, **kwargs):
711        # Logic implemented in SPDKInitiator and KernelInitiator classes
712        pass
713
714    def get_route_nic_numa(self, remote_nvme_ip):
715        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
716        local_nvme_nic = local_nvme_nic[0]["dev"]
717        return self.get_nic_numa_node(local_nvme_nic)
718
719    @staticmethod
720    def gen_fio_offset_section(offset_inc, num_jobs):
721        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
722        return "\n".join(["size=%s%%" % offset_inc,
723                          "offset=0%",
724                          "offset_increment=%s%%" % offset_inc])
725
726    def gen_fio_numa_section(self, fio_filenames_list):
727        numa_stats = {}
728        for nvme in fio_filenames_list:
729            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
730            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
731
732        # Use the most common NUMA node for this chunk to allocate memory and CPUs
733        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
734
735        return "\n".join(["numa_cpu_nodes=%s" % section_local_numa,
736                          "numa_mem_policy=prefer:%s" % section_local_numa])
737
738    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
739                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
740                       offset=False, offset_inc=0):
741        fio_conf_template = """
742[global]
743ioengine={ioengine}
744{spdk_conf}
745thread=1
746group_reporting=1
747direct=1
748percentile_list=50:90:99:99.5:99.9:99.99:99.999
749
750norandommap=1
751rw={rw}
752rwmixread={rwmixread}
753bs={block_size}
754time_based=1
755ramp_time={ramp_time}
756runtime={run_time}
757rate_iops={rate_iops}
758"""
759        if "spdk" in self.mode:
760            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
761            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
762        else:
763            ioengine = self.ioengine
764            spdk_conf = ""
765            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
766                                 "|", "awk", "'{print $1}'"])
767            subsystems = [x for x in out.split("\n") if "nvme" in x]
768
769        if self.cpus_allowed is not None:
770            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
771            cpus_num = 0
772            cpus = self.cpus_allowed.split(",")
773            for cpu in cpus:
774                if "-" in cpu:
775                    a, b = cpu.split("-")
776                    a = int(a)
777                    b = int(b)
778                    cpus_num += len(range(a, b))
779                else:
780                    cpus_num += 1
781            self.num_cores = cpus_num
782            threads = range(0, self.num_cores)
783        elif hasattr(self, 'num_cores'):
784            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
785            threads = range(0, int(self.num_cores))
786        else:
787            self.num_cores = len(subsystems)
788            threads = range(0, len(subsystems))
789
790        if "spdk" in self.mode:
791            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
792                                                          offset, offset_inc)
793        else:
794            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs,
795                                                          offset, offset_inc)
796
797        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
798                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
799                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
800
801        # TODO: hipri disabled for now, as it causes fio errors:
802        # io_u error on file /dev/nvme2n1: Operation not supported
803        # See comment in KernelInitiator class, init_connect() function
804        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
805            fio_config = fio_config + """
806fixedbufs=1
807registerfiles=1
808#hipri=1
809"""
810        if num_jobs:
811            fio_config = fio_config + "numjobs=%s \n" % num_jobs
812        if self.cpus_allowed is not None:
813            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
814            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
815        fio_config = fio_config + filename_section
816
817        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
818        if hasattr(self, "num_cores"):
819            fio_config_filename += "_%sCPU" % self.num_cores
820        fio_config_filename += ".fio"
821
822        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
823        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
824        self.log.info("Created FIO Config:")
825        self.log.info(fio_config)
826
827        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
828
829    def set_cpu_frequency(self):
830        if self.cpu_frequency is not None:
831            try:
832                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
833                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
834                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
835            except Exception:
836                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
837                sys.exit()
838        else:
839            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
840
841    def run_fio(self, fio_config_file, run_num=1):
842        job_name, _ = os.path.splitext(fio_config_file)
843        self.log.info("Starting FIO run for job: %s" % job_name)
844        self.log.info("Using FIO: %s" % self.fio_bin)
845
846        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
847        try:
848            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
849                                    "--output=%s" % output_filename, "--eta=never"], True)
850            self.log.info(output)
851            self.log.info("FIO run finished. Results in: %s" % output_filename)
852        except subprocess.CalledProcessError as e:
853            self.log.error("ERROR: Fio process failed!")
854            self.log.error(e.stdout)
855
856    def sys_config(self):
857        self.log.info("====Kernel release:====")
858        self.log.info(self.exec_cmd(["uname", "-r"]))
859        self.log.info("====Kernel command line:====")
860        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
861        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
862        self.log.info("====sysctl conf:====")
863        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
864        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
865        self.log.info("====Cpu power info:====")
866        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
867
868
869class KernelTarget(Target):
870    def __init__(self, name, general_config, target_config):
871        super().__init__(name, general_config, target_config)
872        # Defaults
873        self.nvmet_bin = "nvmetcli"
874
875        if "nvmet_bin" in target_config:
876            self.nvmet_bin = target_config["nvmet_bin"]
877
878    def stop(self):
879        self.nvmet_command(self.nvmet_bin, "clear")
880        self.restore_settings()
881
882    def get_nvme_device_bdf(self, nvme_dev_path):
883        nvme_name = os.path.basename(nvme_dev_path)
884        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
885
886    def get_nvme_devices(self):
887        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
888        nvme_list = []
889        for dev in dev_list:
890            if "nvme" not in dev:
891                continue
892            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
893                continue
894            if len(self.nvme_allowlist) == 0:
895                nvme_list.append(dev)
896                continue
897            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
898                nvme_list.append(dev)
899        return dev_list
900
901    def nvmet_command(self, nvmet_bin, command):
902        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
903
904    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
905
906        nvmet_cfg = {
907            "ports": [],
908            "hosts": [],
909            "subsystems": [],
910        }
911
912        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
913            port = str(4420 + bdev_num)
914            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
915            serial = "SPDK00%s" % bdev_num
916            bdev_name = nvme_list[bdev_num]
917
918            nvmet_cfg["subsystems"].append({
919                "allowed_hosts": [],
920                "attr": {
921                    "allow_any_host": "1",
922                    "serial": serial,
923                    "version": "1.3"
924                },
925                "namespaces": [
926                    {
927                        "device": {
928                            "path": bdev_name,
929                            "uuid": "%s" % uuid.uuid4()
930                        },
931                        "enable": 1,
932                        "nsid": port
933                    }
934                ],
935                "nqn": nqn
936            })
937
938            nvmet_cfg["ports"].append({
939                "addr": {
940                    "adrfam": "ipv4",
941                    "traddr": ip,
942                    "trsvcid": port,
943                    "trtype": self.transport
944                },
945                "portid": bdev_num,
946                "referrals": [],
947                "subsystems": [nqn]
948            })
949
950            self.subsystem_info_list.append((port, nqn, ip))
951        self.subsys_no = len(self.subsystem_info_list)
952
953        with open("kernel.conf", "w") as fh:
954            fh.write(json.dumps(nvmet_cfg, indent=2))
955
956    def tgt_start(self):
957        self.log.info("Configuring kernel NVMeOF Target")
958
959        if self.null_block:
960            self.log.info("Configuring with null block device.")
961            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
962        else:
963            self.log.info("Configuring with NVMe drives.")
964            nvme_list = self.get_nvme_devices()
965
966        self.kernel_tgt_gen_subsystem_conf(nvme_list)
967        self.subsys_no = len(nvme_list)
968
969        self.nvmet_command(self.nvmet_bin, "clear")
970        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
971
972        if self.enable_adq:
973            self.adq_configure_tc()
974
975        self.log.info("Done configuring kernel NVMeOF Target")
976
977
978class SPDKTarget(Target):
979    def __init__(self, name, general_config, target_config):
980        super().__init__(name, general_config, target_config)
981
982        # Required fields
983        self.core_mask = target_config["core_mask"]
984        self.num_cores = self.get_num_cores(self.core_mask)
985
986        # Defaults
987        self.dif_insert_strip = False
988        self.null_block_dif_type = 0
989        self.num_shared_buffers = 4096
990        self.max_queue_depth = 128
991        self.bpf_proc = None
992        self.bpf_scripts = []
993        self.enable_dsa = False
994        self.scheduler_core_limit = None
995
996        if "num_shared_buffers" in target_config:
997            self.num_shared_buffers = target_config["num_shared_buffers"]
998        if "max_queue_depth" in target_config:
999            self.max_queue_depth = target_config["max_queue_depth"]
1000        if "null_block_dif_type" in target_config:
1001            self.null_block_dif_type = target_config["null_block_dif_type"]
1002        if "dif_insert_strip" in target_config:
1003            self.dif_insert_strip = target_config["dif_insert_strip"]
1004        if "bpf_scripts" in target_config:
1005            self.bpf_scripts = target_config["bpf_scripts"]
1006        if "dsa_settings" in target_config:
1007            self.enable_dsa = target_config["dsa_settings"]
1008        if "scheduler_core_limit" in target_config:
1009            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1010
1011        self.log.info("====DSA settings:====")
1012        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1013
1014    def get_nvme_devices_count(self):
1015        return len(self.get_nvme_devices())
1016
1017    def get_nvme_devices(self):
1018        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1019        bdev_bdfs = []
1020        for bdev in bdev_subsys_json_obj["config"]:
1021            bdev_traddr = bdev["params"]["traddr"]
1022            if bdev_traddr in self.nvme_blocklist:
1023                continue
1024            if len(self.nvme_allowlist) == 0:
1025                bdev_bdfs.append(bdev_traddr)
1026            if bdev_traddr in self.nvme_allowlist:
1027                bdev_bdfs.append(bdev_traddr)
1028        return bdev_bdfs
1029
1030    @staticmethod
1031    def get_num_cores(core_mask):
1032        if "0x" in core_mask:
1033            return bin(int(core_mask, 16)).count("1")
1034        else:
1035            num_cores = 0
1036            core_mask = core_mask.replace("[", "")
1037            core_mask = core_mask.replace("]", "")
1038            for i in core_mask.split(","):
1039                if "-" in i:
1040                    x, y = i.split("-")
1041                    num_cores += len(range(int(x), int(y))) + 1
1042                else:
1043                    num_cores += 1
1044            return num_cores
1045
1046    def spdk_tgt_configure(self):
1047        self.log.info("Configuring SPDK NVMeOF target via RPC")
1048
1049        if self.enable_adq:
1050            self.adq_configure_tc()
1051
1052        # Create transport layer
1053        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1054                                       num_shared_buffers=self.num_shared_buffers,
1055                                       max_queue_depth=self.max_queue_depth,
1056                                       dif_insert_or_strip=self.dif_insert_strip,
1057                                       sock_priority=self.adq_priority)
1058        self.log.info("SPDK NVMeOF transport layer:")
1059        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1060
1061        if self.null_block:
1062            self.spdk_tgt_add_nullblock(self.null_block)
1063            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1064        else:
1065            self.spdk_tgt_add_nvme_conf()
1066            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1067
1068        self.log.info("Done configuring SPDK NVMeOF Target")
1069
1070    def spdk_tgt_add_nullblock(self, null_block_count):
1071        md_size = 0
1072        block_size = 4096
1073        if self.null_block_dif_type != 0:
1074            md_size = 128
1075
1076        self.log.info("Adding null block bdevices to config via RPC")
1077        for i in range(null_block_count):
1078            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1079            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1080                                      dif_type=self.null_block_dif_type, md_size=md_size)
1081        self.log.info("SPDK Bdevs configuration:")
1082        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1083
1084    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1085        self.log.info("Adding NVMe bdevs to config via RPC")
1086
1087        bdfs = self.get_nvme_devices()
1088        bdfs = [b.replace(":", ".") for b in bdfs]
1089
1090        if req_num_disks:
1091            if req_num_disks > len(bdfs):
1092                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1093                sys.exit(1)
1094            else:
1095                bdfs = bdfs[0:req_num_disks]
1096
1097        for i, bdf in enumerate(bdfs):
1098            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1099
1100        self.log.info("SPDK Bdevs configuration:")
1101        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1102
1103    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1104        self.log.info("Adding subsystems to config")
1105        if not req_num_disks:
1106            req_num_disks = self.get_nvme_devices_count()
1107
1108        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1109            port = str(4420 + bdev_num)
1110            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1111            serial = "SPDK00%s" % bdev_num
1112            bdev_name = "Nvme%sn1" % bdev_num
1113
1114            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1115                                           allow_any_host=True, max_namespaces=8)
1116            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1117            for nqn_name in [nqn, "discovery"]:
1118                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1119                                                     nqn=nqn_name,
1120                                                     trtype=self.transport,
1121                                                     traddr=ip,
1122                                                     trsvcid=port,
1123                                                     adrfam="ipv4")
1124            self.subsystem_info_list.append((port, nqn, ip))
1125        self.subsys_no = len(self.subsystem_info_list)
1126
1127        self.log.info("SPDK NVMeOF subsystem configuration:")
1128        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1129
1130    def bpf_start(self):
1131        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1132        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1133        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1134        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1135
1136        with open(self.pid, "r") as fh:
1137            nvmf_pid = str(fh.readline())
1138
1139        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1140        self.log.info(cmd)
1141        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1142
1143    def tgt_start(self):
1144        if self.null_block:
1145            self.subsys_no = 1
1146        else:
1147            self.subsys_no = self.get_nvme_devices_count()
1148        self.log.info("Starting SPDK NVMeOF Target process")
1149        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1150        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1151        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1152
1153        with open(self.pid, "w") as fh:
1154            fh.write(str(proc.pid))
1155        self.nvmf_proc = proc
1156        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1157        self.log.info("Waiting for spdk to initialize...")
1158        while True:
1159            if os.path.exists("/var/tmp/spdk.sock"):
1160                break
1161            time.sleep(1)
1162        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1163
1164        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1165
1166        if self.enable_zcopy:
1167            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1168                                           enable_zerocopy_send_server=True)
1169            self.log.info("Target socket options:")
1170            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1171
1172        if self.enable_adq:
1173            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1174            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1175                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1176
1177        if self.enable_dsa:
1178            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1179            self.log.info("Target DSA accel module enabled")
1180
1181        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1182        rpc.framework_start_init(self.client)
1183
1184        if self.bpf_scripts:
1185            self.bpf_start()
1186
1187        self.spdk_tgt_configure()
1188
1189    def stop(self):
1190        if self.bpf_proc:
1191            self.log.info("Stopping BPF Trace script")
1192            self.bpf_proc.terminate()
1193            self.bpf_proc.wait()
1194
1195        if hasattr(self, "nvmf_proc"):
1196            try:
1197                self.nvmf_proc.terminate()
1198                self.nvmf_proc.wait(timeout=30)
1199            except Exception as e:
1200                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1201                self.log.info(e)
1202                self.nvmf_proc.kill()
1203                self.nvmf_proc.communicate()
1204                # Try to clean up RPC socket files if they were not removed
1205                # because of using 'kill'
1206                try:
1207                    os.remove("/var/tmp/spdk.sock")
1208                    os.remove("/var/tmp/spdk.sock.lock")
1209                except FileNotFoundError:
1210                    pass
1211        self.restore_settings()
1212
1213
1214class KernelInitiator(Initiator):
1215    def __init__(self, name, general_config, initiator_config):
1216        super().__init__(name, general_config, initiator_config)
1217
1218        # Defaults
1219        self.extra_params = ""
1220        self.ioengine = "libaio"
1221
1222        if "extra_params" in initiator_config:
1223            self.extra_params = initiator_config["extra_params"]
1224
1225        if "kernel_engine" in initiator_config:
1226            self.ioengine = initiator_config["kernel_engine"]
1227            if "io_uring" in self.ioengine:
1228                self.extra_params = "--nr-poll-queues=8"
1229
1230    def get_connected_nvme_list(self):
1231        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1232        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1233                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1234        return nvme_list
1235
1236    def init_connect(self):
1237        self.log.info("Below connection attempts may result in error messages, this is expected!")
1238        for subsystem in self.subsystem_info_list:
1239            self.log.info("Trying to connect %s %s %s" % subsystem)
1240            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1241                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1242            time.sleep(2)
1243
1244        if "io_uring" in self.ioengine:
1245            self.log.info("Setting block layer settings for io_uring.")
1246
1247            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1248            #       apparently it's not possible for connected subsystems.
1249            #       Results in "error: Invalid argument"
1250            block_sysfs_settings = {
1251                "iostats": "0",
1252                "rq_affinity": "0",
1253                "nomerges": "2"
1254            }
1255
1256            for disk in self.get_connected_nvme_list():
1257                sysfs = os.path.join("/sys/block", disk, "queue")
1258                for k, v in block_sysfs_settings.items():
1259                    sysfs_opt_path = os.path.join(sysfs, k)
1260                    try:
1261                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1262                    except CalledProcessError as e:
1263                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1264                    finally:
1265                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1266                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1267
1268    def init_disconnect(self):
1269        for subsystem in self.subsystem_info_list:
1270            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1271            time.sleep(1)
1272
1273    def get_nvme_subsystem_numa(self, dev_name):
1274        # Remove two last characters to get controller name instead of subsystem name
1275        nvme_ctrl = os.path.basename(dev_name)[:-2]
1276        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1277                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1278        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1279
1280    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1281        # Generate connected nvme devices names and sort them by used NIC numa node
1282        # to allow better grouping when splitting into fio sections.
1283        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1284        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1285        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1286
1287        filename_section = ""
1288        nvme_per_split = int(len(nvme_list) / len(threads))
1289        remainder = len(nvme_list) % len(threads)
1290        iterator = iter(nvme_list)
1291        result = []
1292        for i in range(len(threads)):
1293            result.append([])
1294            for _ in range(nvme_per_split):
1295                result[i].append(next(iterator))
1296                if remainder:
1297                    result[i].append(next(iterator))
1298                    remainder -= 1
1299        for i, r in enumerate(result):
1300            header = "[filename%s]" % i
1301            disks = "\n".join(["filename=%s" % x for x in r])
1302            job_section_qd = round((io_depth * len(r)) / num_jobs)
1303            if job_section_qd == 0:
1304                job_section_qd = 1
1305            iodepth = "iodepth=%s" % job_section_qd
1306
1307            offset_section = ""
1308            if offset:
1309                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1310
1311            numa_opts = self.gen_fio_numa_section(r)
1312
1313            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1314
1315        return filename_section
1316
1317
1318class SPDKInitiator(Initiator):
1319    def __init__(self, name, general_config, initiator_config):
1320        super().__init__(name, general_config, initiator_config)
1321
1322        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1323            self.install_spdk()
1324
1325        # Required fields
1326        self.num_cores = initiator_config["num_cores"]
1327
1328        # Optional fields
1329        self.enable_data_digest = False
1330        if "enable_data_digest" in initiator_config:
1331            self.enable_data_digest = initiator_config["enable_data_digest"]
1332
1333    def install_spdk(self):
1334        self.log.info("Using fio binary %s" % self.fio_bin)
1335        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1336        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1337        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1338        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1339        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1340
1341        self.log.info("SPDK built")
1342        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1343
1344    def init_connect(self):
1345        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1346        # bdev plugin initiates connection just before starting IO traffic.
1347        # This is just to have a "init_connect" equivalent of the same function
1348        # from KernelInitiator class.
1349        # Just prepare bdev.conf JSON file for later use and consider it
1350        # "making a connection".
1351        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1352        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1353
1354    def init_disconnect(self):
1355        # SPDK Initiator does not need to explicity disconnect as this gets done
1356        # after fio bdev plugin finishes IO.
1357        pass
1358
1359    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1360        bdev_cfg_section = {
1361            "subsystems": [
1362                {
1363                    "subsystem": "bdev",
1364                    "config": []
1365                }
1366            ]
1367        }
1368
1369        for i, subsys in enumerate(remote_subsystem_list):
1370            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1371            nvme_ctrl = {
1372                "method": "bdev_nvme_attach_controller",
1373                "params": {
1374                    "name": "Nvme{}".format(i),
1375                    "trtype": self.transport,
1376                    "traddr": sub_addr,
1377                    "trsvcid": sub_port,
1378                    "subnqn": sub_nqn,
1379                    "adrfam": "IPv4"
1380                }
1381            }
1382
1383            if self.enable_adq:
1384                nvme_ctrl["params"].update({"priority": "1"})
1385
1386            if self.enable_data_digest:
1387                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1388
1389            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1390
1391        return json.dumps(bdev_cfg_section, indent=2)
1392
1393    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1394        filename_section = ""
1395        if len(threads) >= len(subsystems):
1396            threads = range(0, len(subsystems))
1397
1398        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1399        # to allow better grouping when splitting into fio sections.
1400        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1401        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1402        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1403
1404        nvme_per_split = int(len(subsystems) / len(threads))
1405        remainder = len(subsystems) % len(threads)
1406        iterator = iter(filenames)
1407        result = []
1408        for i in range(len(threads)):
1409            result.append([])
1410            for _ in range(nvme_per_split):
1411                result[i].append(next(iterator))
1412            if remainder:
1413                result[i].append(next(iterator))
1414                remainder -= 1
1415        for i, r in enumerate(result):
1416            header = "[filename%s]" % i
1417            disks = "\n".join(["filename=%s" % x for x in r])
1418            job_section_qd = round((io_depth * len(r)) / num_jobs)
1419            if job_section_qd == 0:
1420                job_section_qd = 1
1421            iodepth = "iodepth=%s" % job_section_qd
1422
1423            offset_section = ""
1424            if offset:
1425                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1426
1427            numa_opts = self.gen_fio_numa_section(r)
1428
1429            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1430
1431        return filename_section
1432
1433    def get_nvme_subsystem_numa(self, bdev_name):
1434        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1435        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1436
1437        # Remove two last characters to get controller name instead of subsystem name
1438        nvme_ctrl = bdev_name[:-2]
1439        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1440        return self.get_route_nic_numa(remote_nvme_ip)
1441
1442
1443if __name__ == "__main__":
1444    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1445    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1446
1447    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1448    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1449                        help='Configuration file.')
1450    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1451                        help='Results directory.')
1452    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1453                        help='CSV results filename.')
1454    parser.add_argument('-f', '--force', default=False, action='store_true',
1455                        dest='force', help="""Force script to continue and try to use all
1456                        available NVMe devices during test.
1457                        WARNING: Might result in data loss on used NVMe drives""")
1458
1459    args = parser.parse_args()
1460
1461    logging.basicConfig(level=logging.INFO,
1462                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1463
1464    logging.info("Using config file: %s" % args.config)
1465    with open(args.config, "r") as config:
1466        data = json.load(config)
1467
1468    initiators = []
1469    fio_cases = []
1470
1471    general_config = data["general"]
1472    target_config = data["target"]
1473    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1474
1475    if "null_block_devices" not in data["target"] and \
1476        (args.force is False and
1477            "allowlist" not in data["target"] and
1478            "blocklist" not in data["target"]):
1479        # TODO: Also check if allowlist or blocklist are not empty.
1480        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1481        You can choose to use all available NVMe drives on your system, which may potentially
1482        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1483        exit(1)
1484
1485    for k, v in data.items():
1486        if "target" in k:
1487            v.update({"results_dir": args.results})
1488            if data[k]["mode"] == "spdk":
1489                target_obj = SPDKTarget(k, data["general"], v)
1490            elif data[k]["mode"] == "kernel":
1491                target_obj = KernelTarget(k, data["general"], v)
1492        elif "initiator" in k:
1493            if data[k]["mode"] == "spdk":
1494                init_obj = SPDKInitiator(k, data["general"], v)
1495            elif data[k]["mode"] == "kernel":
1496                init_obj = KernelInitiator(k, data["general"], v)
1497            initiators.append(init_obj)
1498        elif "fio" in k:
1499            fio_workloads = itertools.product(data[k]["bs"],
1500                                              data[k]["qd"],
1501                                              data[k]["rw"])
1502
1503            fio_run_time = data[k]["run_time"]
1504            fio_ramp_time = data[k]["ramp_time"]
1505            fio_rw_mix_read = data[k]["rwmixread"]
1506            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1507            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1508
1509            fio_rate_iops = 0
1510            if "rate_iops" in data[k]:
1511                fio_rate_iops = data[k]["rate_iops"]
1512
1513            fio_offset = False
1514            if "offset" in data[k]:
1515                fio_offset = data[k]["offset"]
1516            fio_offset_inc = 0
1517            if "offset_inc" in data[k]:
1518                fio_offset_inc = data[k]["offset_inc"]
1519        else:
1520            continue
1521
1522    try:
1523        os.mkdir(args.results)
1524    except FileExistsError:
1525        pass
1526
1527    for i in initiators:
1528        target_obj.initiator_info.append(
1529            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1530        )
1531
1532    # TODO: This try block is definietly too large. Need to break this up into separate
1533    # logical blocks to reduce size.
1534    try:
1535        target_obj.tgt_start()
1536
1537        for i in initiators:
1538            i.match_subsystems(target_obj.subsystem_info_list)
1539            if i.enable_adq:
1540                i.adq_configure_tc()
1541
1542        # Poor mans threading
1543        # Run FIO tests
1544        for block_size, io_depth, rw in fio_workloads:
1545            configs = []
1546            for i in initiators:
1547                i.init_connect()
1548                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1549                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1550                                       fio_offset, fio_offset_inc)
1551                configs.append(cfg)
1552
1553            for run_no in range(1, fio_run_num+1):
1554                threads = []
1555                power_daemon = None
1556                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1557
1558                for i, cfg in zip(initiators, configs):
1559                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1560                    threads.append(t)
1561                if target_obj.enable_sar:
1562                    sar_file_prefix = measurements_prefix + "_sar"
1563                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1564                    threads.append(t)
1565
1566                if target_obj.enable_pcm:
1567                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1568
1569                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1570                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1571                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1572                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1573                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1574                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1575
1576                    threads.append(pcm_cpu_t)
1577                    threads.append(pcm_mem_t)
1578                    threads.append(pcm_pow_t)
1579
1580                if target_obj.enable_bw:
1581                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1582                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1583                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1584                    threads.append(t)
1585
1586                if target_obj.enable_dpdk_memory:
1587                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1588                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1589                    threads.append(t)
1590
1591                if target_obj.enable_pm:
1592                    power_daemon = threading.Thread(target=target_obj.measure_power,
1593                                                    args=(args.results, measurements_prefix, script_full_dir,
1594                                                          fio_ramp_time, fio_run_time))
1595                    threads.append(power_daemon)
1596
1597                if target_obj.enable_adq:
1598                    ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1599                    threads.append(ethtool_thread)
1600
1601                for t in threads:
1602                    t.start()
1603                for t in threads:
1604                    t.join()
1605
1606            for i in initiators:
1607                i.init_disconnect()
1608                i.copy_result_files(args.results)
1609        try:
1610            parse_results(args.results, args.csv_filename)
1611        except Exception:
1612            logging.error("There was an error with parsing the results")
1613    finally:
1614        for i in initiators:
1615            try:
1616                i.stop()
1617            except Exception as err:
1618                pass
1619        target_obj.stop()
1620