xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 6e8e184bca3b27096663dcc22f4b8b179a46a82b)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    # pylint: disable=R0201
97    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
98        return ""
99
100    def configure_system(self):
101        self.configure_services()
102        self.configure_sysctl()
103        self.configure_tuned()
104        self.configure_cpu_governor()
105        self.configure_irq_affinity()
106
107    def configure_adq(self):
108        if self.mode == "kernel":
109            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
110            return
111        self.adq_load_modules()
112        self.adq_configure_nic()
113
114    def adq_load_modules(self):
115        self.log_print("Modprobing ADQ-related Linux modules...")
116        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
117        for module in adq_module_deps:
118            try:
119                self.exec_cmd(["sudo", "modprobe", module])
120                self.log_print("%s loaded!" % module)
121            except CalledProcessError as e:
122                self.log_print("ERROR: failed to load module %s" % module)
123                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
124
125    def adq_configure_tc(self):
126        self.log_print("Configuring ADQ Traffic classes and filters...")
127
128        if self.mode == "kernel":
129            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
130            return
131
132        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
133        num_queues_tc1 = self.num_cores
134        port_param = "dst_port" if isinstance(self, Target) else "src_port"
135        port = "4420"
136        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
137
138        for nic_ip in self.nic_ips:
139            nic_name = self.get_nic_name_by_ip(nic_ip)
140            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
141                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
142                                "queues", "%s@0" % num_queues_tc0,
143                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
144                                "hw", "1", "mode", "channel"]
145            self.log_print(" ".join(tc_qdisc_map_cmd))
146            self.exec_cmd(tc_qdisc_map_cmd)
147
148            time.sleep(5)
149            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
150            self.log_print(" ".join(tc_qdisc_ingress_cmd))
151            self.exec_cmd(tc_qdisc_ingress_cmd)
152
153            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
154                             "protocol", "ip", "ingress", "prio", "1", "flower",
155                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
156                             "skip_sw", "hw_tc", "1"]
157            self.log_print(" ".join(tc_filter_cmd))
158            self.exec_cmd(tc_filter_cmd)
159
160            # show tc configuration
161            self.log_print("Show tc configuration for %s NIC..." % nic_name)
162            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
163            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
164            self.log_print("%s" % tc_disk_out)
165            self.log_print("%s" % tc_filter_out)
166
167            # Ethtool coalesce settings must be applied after configuring traffic classes
168            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
169            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
170
171            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
172            xps_cmd = ["sudo", xps_script_path, nic_name]
173            self.log_print(xps_cmd)
174            self.exec_cmd(xps_cmd)
175
176    def adq_configure_nic(self):
177        self.log_print("Configuring NIC port settings for ADQ testing...")
178
179        # Reload the driver first, to make sure any previous settings are re-set.
180        try:
181            self.exec_cmd(["sudo", "rmmod", "ice"])
182            self.exec_cmd(["sudo", "modprobe", "ice"])
183        except CalledProcessError as e:
184            self.log_print("ERROR: failed to reload ice module!")
185            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
186
187        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
188        for nic in nic_names:
189            self.log_print(nic)
190            try:
191                self.exec_cmd(["sudo", "ethtool", "-K", nic,
192                               "hw-tc-offload", "on"])  # Enable hardware TC offload
193                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
194                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
195                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
196                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
197                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
198                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
199                               "channel-pkt-inspect-optimize", "on"])
200            except CalledProcessError as e:
201                self.log_print("ERROR: failed to configure NIC port using ethtool!")
202                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
203                self.log_print("Please update your NIC driver and firmware versions and try again.")
204            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
205            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
206
207    def configure_services(self):
208        self.log_print("Configuring active services...")
209        svc_config = configparser.ConfigParser(strict=False)
210
211        # Below list is valid only for RHEL / Fedora systems and might not
212        # contain valid names for other distributions.
213        svc_target_state = {
214            "firewalld": "inactive",
215            "irqbalance": "inactive",
216            "lldpad.service": "inactive",
217            "lldpad.socket": "inactive"
218        }
219
220        for service in svc_target_state:
221            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
222            out = "\n".join(["[%s]" % service, out])
223            svc_config.read_string(out)
224
225            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
226                continue
227
228            service_state = svc_config[service]["ActiveState"]
229            self.log_print("Current state of %s service is %s" % (service, service_state))
230            self.svc_restore_dict.update({service: service_state})
231            if service_state != "inactive":
232                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
233                self.exec_cmd(["sudo", "systemctl", "stop", service])
234
235    def configure_sysctl(self):
236        self.log_print("Tuning sysctl settings...")
237
238        busy_read = 0
239        if self.enable_adq and self.mode == "spdk":
240            busy_read = 1
241
242        sysctl_opts = {
243            "net.core.busy_poll": 0,
244            "net.core.busy_read": busy_read,
245            "net.core.somaxconn": 4096,
246            "net.core.netdev_max_backlog": 8192,
247            "net.ipv4.tcp_max_syn_backlog": 16384,
248            "net.core.rmem_max": 268435456,
249            "net.core.wmem_max": 268435456,
250            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
251            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
252            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
253            "net.ipv4.route.flush": 1,
254            "vm.overcommit_memory": 1,
255        }
256
257        for opt, value in sysctl_opts.items():
258            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
259            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
260
261    def configure_tuned(self):
262        if not self.tuned_profile:
263            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
264            return
265
266        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
267        service = "tuned"
268        tuned_config = configparser.ConfigParser(strict=False)
269
270        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
271        out = "\n".join(["[%s]" % service, out])
272        tuned_config.read_string(out)
273        tuned_state = tuned_config[service]["ActiveState"]
274        self.svc_restore_dict.update({service: tuned_state})
275
276        if tuned_state != "inactive":
277            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
278            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
279
280            self.tuned_restore_dict = {
281                "profile": profile,
282                "mode": profile_mode
283            }
284
285        self.exec_cmd(["sudo", "systemctl", "start", service])
286        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
287        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
288
289    def configure_cpu_governor(self):
290        self.log_print("Setting CPU governor to performance...")
291
292        # This assumes that there is the same CPU scaling governor on each CPU
293        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
294        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
295
296    def configure_irq_affinity(self):
297        self.log_print("Setting NIC irq affinity for NICs...")
298
299        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
300        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
301        for nic in nic_names:
302            irq_cmd = ["sudo", irq_script_path, nic]
303            self.log_print(irq_cmd)
304            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
305
306    def restore_services(self):
307        self.log_print("Restoring services...")
308        for service, state in self.svc_restore_dict.items():
309            cmd = "stop" if state == "inactive" else "start"
310            self.exec_cmd(["sudo", "systemctl", cmd, service])
311
312    def restore_sysctl(self):
313        self.log_print("Restoring sysctl settings...")
314        for opt, value in self.sysctl_restore_dict.items():
315            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
316
317    def restore_tuned(self):
318        self.log_print("Restoring tuned-adm settings...")
319
320        if not self.tuned_restore_dict:
321            return
322
323        if self.tuned_restore_dict["mode"] == "auto":
324            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
325            self.log_print("Reverted tuned-adm to auto_profile.")
326        else:
327            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
328            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
329
330    def restore_governor(self):
331        self.log_print("Restoring CPU governor setting...")
332        if self.governor_restore:
333            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
334            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
335
336
337class Target(Server):
338    def __init__(self, name, general_config, target_config):
339        super().__init__(name, general_config, target_config)
340
341        # Defaults
342        self.enable_sar = False
343        self.sar_delay = 0
344        self.sar_interval = 0
345        self.sar_count = 0
346        self.enable_pcm = False
347        self.pcm_dir = ""
348        self.pcm_delay = 0
349        self.pcm_interval = 0
350        self.pcm_count = 0
351        self.enable_bandwidth = 0
352        self.bandwidth_count = 0
353        self.enable_dpdk_memory = False
354        self.dpdk_wait_time = 0
355        self.enable_zcopy = False
356        self.scheduler_name = "static"
357        self.null_block = 0
358        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
359        self.subsystem_info_list = []
360
361        if "null_block_devices" in target_config:
362            self.null_block = target_config["null_block_devices"]
363        if "sar_settings" in target_config:
364            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
365        if "pcm_settings" in target_config:
366            self.enable_pcm = True
367            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
368        if "enable_bandwidth" in target_config:
369            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
370        if "enable_dpdk_memory" in target_config:
371            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
372        if "scheduler_settings" in target_config:
373            self.scheduler_name = target_config["scheduler_settings"]
374        if "zcopy_settings" in target_config:
375            self.enable_zcopy = target_config["zcopy_settings"]
376        if "results_dir" in target_config:
377            self.results_dir = target_config["results_dir"]
378
379        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
380        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
381        self.set_local_nic_info(self.set_local_nic_info_helper())
382
383        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
384            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
385
386        self.configure_system()
387        if self.enable_adq:
388            self.configure_adq()
389        self.sys_config()
390
391    def set_local_nic_info_helper(self):
392        return json.loads(self.exec_cmd(["lshw", "-json"]))
393
394    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
395        stderr_opt = None
396        if stderr_redirect:
397            stderr_opt = subprocess.STDOUT
398        if change_dir:
399            old_cwd = os.getcwd()
400            os.chdir(change_dir)
401            self.log_print("Changing directory to %s" % change_dir)
402
403        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
404
405        if change_dir:
406            os.chdir(old_cwd)
407            self.log_print("Changing directory to %s" % old_cwd)
408        return out
409
410    def zip_spdk_sources(self, spdk_dir, dest_file):
411        self.log_print("Zipping SPDK source directory")
412        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
413        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
414            for file in files:
415                fh.write(os.path.relpath(os.path.join(root, file)))
416        fh.close()
417        self.log_print("Done zipping")
418
419    @staticmethod
420    def read_json_stats(file):
421        with open(file, "r") as json_data:
422            data = json.load(json_data)
423            job_pos = 0  # job_post = 0 because using aggregated results
424
425            # Check if latency is in nano or microseconds to choose correct dict key
426            def get_lat_unit(key_prefix, dict_section):
427                # key prefix - lat, clat or slat.
428                # dict section - portion of json containing latency bucket in question
429                # Return dict key to access the bucket and unit as string
430                for k, _ in dict_section.items():
431                    if k.startswith(key_prefix):
432                        return k, k.split("_")[1]
433
434            def get_clat_percentiles(clat_dict_leaf):
435                if "percentile" in clat_dict_leaf:
436                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
437                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
438                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
439                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
440
441                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
442                else:
443                    # Latest fio versions do not provide "percentile" results if no
444                    # measurements were done, so just return zeroes
445                    return [0, 0, 0, 0]
446
447            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
448            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
449            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
450            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
451            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
452            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
453            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
454            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
455                data["jobs"][job_pos]["read"][clat_key])
456
457            if "ns" in lat_unit:
458                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
459            if "ns" in clat_unit:
460                read_p99_lat = read_p99_lat / 1000
461                read_p99_9_lat = read_p99_9_lat / 1000
462                read_p99_99_lat = read_p99_99_lat / 1000
463                read_p99_999_lat = read_p99_999_lat / 1000
464
465            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
466            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
467            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
468            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
469            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
470            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
471            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
472            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
473                data["jobs"][job_pos]["write"][clat_key])
474
475            if "ns" in lat_unit:
476                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
477            if "ns" in clat_unit:
478                write_p99_lat = write_p99_lat / 1000
479                write_p99_9_lat = write_p99_9_lat / 1000
480                write_p99_99_lat = write_p99_99_lat / 1000
481                write_p99_999_lat = write_p99_999_lat / 1000
482
483        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
484                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
485                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
486                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
487
488    def parse_results(self, results_dir, csv_file):
489        files = os.listdir(results_dir)
490        fio_files = filter(lambda x: ".fio" in x, files)
491        json_files = [x for x in files if ".json" in x]
492
493        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
494                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
495                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
496                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
497
498        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
499                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
500
501        header_line = ",".join(["Name", *headers])
502        aggr_header_line = ",".join(["Name", *aggr_headers])
503
504        # Create empty results file
505        with open(os.path.join(results_dir, csv_file), "w") as fh:
506            fh.write(aggr_header_line + "\n")
507        rows = set()
508
509        for fio_config in fio_files:
510            self.log_print("Getting FIO stats for %s" % fio_config)
511            job_name, _ = os.path.splitext(fio_config)
512
513            # Look in the filename for rwmixread value. Function arguments do
514            # not have that information.
515            # TODO: Improve this function by directly using workload params instead
516            # of regexing through filenames.
517            if "read" in job_name:
518                rw_mixread = 1
519            elif "write" in job_name:
520                rw_mixread = 0
521            else:
522                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
523
524            # If "_CPU" exists in name - ignore it
525            # Initiators for the same job could have different num_cores parameter
526            job_name = re.sub(r"_\d+CPU", "", job_name)
527            job_result_files = [x for x in json_files if x.startswith(job_name)]
528            self.log_print("Matching result files for current fio config:")
529            for j in job_result_files:
530                self.log_print("\t %s" % j)
531
532            # There may have been more than 1 initiator used in test, need to check that
533            # Result files are created so that string after last "_" separator is server name
534            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
535            inits_avg_results = []
536            for i in inits_names:
537                self.log_print("\tGetting stats for initiator %s" % i)
538                # There may have been more than 1 test run for this job, calculate average results for initiator
539                i_results = [x for x in job_result_files if i in x]
540                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
541
542                separate_stats = []
543                for r in i_results:
544                    try:
545                        stats = self.read_json_stats(os.path.join(results_dir, r))
546                        separate_stats.append(stats)
547                        self.log_print(stats)
548                    except JSONDecodeError:
549                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
550
551                init_results = [sum(x) for x in zip(*separate_stats)]
552                init_results = [x / len(separate_stats) for x in init_results]
553                inits_avg_results.append(init_results)
554
555                self.log_print("\tAverage results for initiator %s" % i)
556                self.log_print(init_results)
557                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
558                    fh.write(header_line + "\n")
559                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
560
561            # Sum results of all initiators running this FIO job.
562            # Latency results are an average of latencies from accros all initiators.
563            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
564            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
565            for key in inits_avg_results:
566                if "lat" in key:
567                    inits_avg_results[key] /= len(inits_names)
568
569            # Aggregate separate read/write values into common labels
570            # Take rw_mixread into consideration for mixed read/write workloads.
571            aggregate_results = OrderedDict()
572            for h in aggr_headers:
573                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
574                if "lat" in h:
575                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
576                else:
577                    _ = read_stat + write_stat
578                aggregate_results[h] = "{0:.3f}".format(_)
579
580            rows.add(",".join([job_name, *aggregate_results.values()]))
581
582        # Save results to file
583        for row in rows:
584            with open(os.path.join(results_dir, csv_file), "a") as fh:
585                fh.write(row + "\n")
586        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
587
588    def measure_sar(self, results_dir, sar_file_name):
589        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
590        cpu_number = os.cpu_count()
591        sar_idle_sum = 0
592        time.sleep(self.sar_delay)
593        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
594        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
595            for line in out.split("\n"):
596                if "Average" in line:
597                    if "CPU" in line:
598                        self.log_print("Summary CPU utilization from SAR:")
599                        self.log_print(line)
600                    elif "all" in line:
601                        self.log_print(line)
602                    else:
603                        sar_idle_sum += float(line.split()[7])
604            fh.write(out)
605        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
606        with open(os.path.join(results_dir, sar_file_name), "a") as f:
607            f.write("Total CPU used: " + str(sar_cpu_usage))
608
609    def ethtool_after_fio_ramp(self, fio_ramp_time):
610        time.sleep(fio_ramp_time//2)
611        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
612        for nic in nic_names:
613            self.log_print(nic)
614            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
615                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
616
617    def measure_pcm_memory(self, results_dir, pcm_file_name):
618        time.sleep(self.pcm_delay)
619        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
620        pcm_memory = subprocess.Popen(cmd)
621        time.sleep(self.pcm_count)
622        pcm_memory.terminate()
623
624    def measure_pcm(self, results_dir, pcm_file_name):
625        time.sleep(self.pcm_delay)
626        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
627        subprocess.run(cmd)
628        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
629        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
630        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
631        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
632        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
633
634    def measure_pcm_power(self, results_dir, pcm_power_file_name):
635        time.sleep(self.pcm_delay)
636        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
637        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
638            fh.write(out)
639
640    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
641        self.log_print("INFO: starting network bandwidth measure")
642        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
643                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
644
645    def measure_dpdk_memory(self, results_dir):
646        self.log_print("INFO: waiting to generate DPDK memory usage")
647        time.sleep(self.dpdk_wait_time)
648        self.log_print("INFO: generating DPDK memory usage")
649        rpc.env.env_dpdk_get_mem_stats
650        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
651
652    def sys_config(self):
653        self.log_print("====Kernel release:====")
654        self.log_print(os.uname().release)
655        self.log_print("====Kernel command line:====")
656        with open('/proc/cmdline') as f:
657            cmdline = f.readlines()
658            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
659        self.log_print("====sysctl conf:====")
660        with open('/etc/sysctl.conf') as f:
661            sysctl = f.readlines()
662            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
663        self.log_print("====Cpu power info:====")
664        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
665        self.log_print("====zcopy settings:====")
666        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
667        self.log_print("====Scheduler settings:====")
668        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
669
670
671class Initiator(Server):
672    def __init__(self, name, general_config, initiator_config):
673        super().__init__(name, general_config, initiator_config)
674
675        # Required fields
676        self.ip = initiator_config["ip"]
677        self.target_nic_ips = initiator_config["target_nic_ips"]
678
679        # Defaults
680        self.cpus_allowed = None
681        self.cpus_allowed_policy = "shared"
682        self.spdk_dir = "/tmp/spdk"
683        self.fio_bin = "/usr/src/fio/fio"
684        self.nvmecli_bin = "nvme"
685        self.cpu_frequency = None
686        self.subsystem_info_list = []
687
688        if "spdk_dir" in initiator_config:
689            self.spdk_dir = initiator_config["spdk_dir"]
690        if "fio_bin" in initiator_config:
691            self.fio_bin = initiator_config["fio_bin"]
692        if "nvmecli_bin" in initiator_config:
693            self.nvmecli_bin = initiator_config["nvmecli_bin"]
694        if "cpus_allowed" in initiator_config:
695            self.cpus_allowed = initiator_config["cpus_allowed"]
696        if "cpus_allowed_policy" in initiator_config:
697            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
698        if "cpu_frequency" in initiator_config:
699            self.cpu_frequency = initiator_config["cpu_frequency"]
700        if os.getenv('SPDK_WORKSPACE'):
701            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
702
703        self.ssh_connection = paramiko.SSHClient()
704        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
705        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
706        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
707        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
708        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
709
710        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
711            self.copy_spdk("/tmp/spdk.zip")
712        self.set_local_nic_info(self.set_local_nic_info_helper())
713        self.set_cpu_frequency()
714        self.configure_system()
715        if self.enable_adq:
716            self.configure_adq()
717        self.sys_config()
718
719    def set_local_nic_info_helper(self):
720        return json.loads(self.exec_cmd(["lshw", "-json"]))
721
722    def stop(self):
723        self.ssh_connection.close()
724
725    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
726        if change_dir:
727            cmd = ["cd", change_dir, ";", *cmd]
728
729        # In case one of the command elements contains whitespace and is not
730        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
731        # when sending to remote system.
732        for i, c in enumerate(cmd):
733            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
734                cmd[i] = '"%s"' % c
735        cmd = " ".join(cmd)
736
737        # Redirect stderr to stdout thanks using get_pty option if needed
738        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
739        out = stdout.read().decode(encoding="utf-8")
740
741        # Check the return code
742        rc = stdout.channel.recv_exit_status()
743        if rc:
744            raise CalledProcessError(int(rc), cmd, out)
745
746        return out
747
748    def put_file(self, local, remote_dest):
749        ftp = self.ssh_connection.open_sftp()
750        ftp.put(local, remote_dest)
751        ftp.close()
752
753    def get_file(self, remote, local_dest):
754        ftp = self.ssh_connection.open_sftp()
755        ftp.get(remote, local_dest)
756        ftp.close()
757
758    def copy_spdk(self, local_spdk_zip):
759        self.log_print("Copying SPDK sources to initiator %s" % self.name)
760        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
761        self.log_print("Copied sources zip from target")
762        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
763        self.log_print("Sources unpacked")
764
765    def copy_result_files(self, dest_dir):
766        self.log_print("Copying results")
767
768        if not os.path.exists(dest_dir):
769            os.mkdir(dest_dir)
770
771        # Get list of result files from initiator and copy them back to target
772        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
773
774        for file in file_list:
775            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
776                          os.path.join(dest_dir, file))
777        self.log_print("Done copying results")
778
779    def discover_subsystems(self, address_list, subsys_no):
780        num_nvmes = range(0, subsys_no)
781        nvme_discover_output = ""
782        for ip, subsys_no in itertools.product(address_list, num_nvmes):
783            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
784            nvme_discover_cmd = ["sudo",
785                                 "%s" % self.nvmecli_bin,
786                                 "discover", "-t", "%s" % self.transport,
787                                 "-s", "%s" % (4420 + subsys_no),
788                                 "-a", "%s" % ip]
789
790            try:
791                stdout = self.exec_cmd(nvme_discover_cmd)
792                if stdout:
793                    nvme_discover_output = nvme_discover_output + stdout
794            except CalledProcessError:
795                # Do nothing. In case of discovering remote subsystems of kernel target
796                # we expect "nvme discover" to fail a bunch of times because we basically
797                # scan ports.
798                pass
799
800        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
801                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
802                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
803                                nvme_discover_output)  # from nvme discovery output
804        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
805        subsystems = list(set(subsystems))
806        subsystems.sort(key=lambda x: x[1])
807        self.log_print("Found matching subsystems on target side:")
808        for s in subsystems:
809            self.log_print(s)
810        self.subsystem_info_list = subsystems
811
812    def gen_fio_filename_conf(self, *args, **kwargs):
813        # Logic implemented in SPDKInitiator and KernelInitiator classes
814        pass
815
816    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
817        fio_conf_template = """
818[global]
819ioengine={ioengine}
820{spdk_conf}
821thread=1
822group_reporting=1
823direct=1
824percentile_list=50:90:99:99.5:99.9:99.99:99.999
825
826norandommap=1
827rw={rw}
828rwmixread={rwmixread}
829bs={block_size}
830time_based=1
831ramp_time={ramp_time}
832runtime={run_time}
833rate_iops={rate_iops}
834"""
835        if "spdk" in self.mode:
836            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
837            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
838            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
839            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
840        else:
841            ioengine = self.ioengine
842            spdk_conf = ""
843            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
844                                 "|", "awk", "'{print $1}'"])
845            subsystems = [x for x in out.split("\n") if "nvme" in x]
846
847        if self.cpus_allowed is not None:
848            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
849            cpus_num = 0
850            cpus = self.cpus_allowed.split(",")
851            for cpu in cpus:
852                if "-" in cpu:
853                    a, b = cpu.split("-")
854                    a = int(a)
855                    b = int(b)
856                    cpus_num += len(range(a, b))
857                else:
858                    cpus_num += 1
859            self.num_cores = cpus_num
860            threads = range(0, self.num_cores)
861        elif hasattr(self, 'num_cores'):
862            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
863            threads = range(0, int(self.num_cores))
864        else:
865            self.num_cores = len(subsystems)
866            threads = range(0, len(subsystems))
867
868        if "spdk" in self.mode:
869            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
870        else:
871            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
872
873        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
874                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
875                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
876
877        # TODO: hipri disabled for now, as it causes fio errors:
878        # io_u error on file /dev/nvme2n1: Operation not supported
879        # See comment in KernelInitiator class, kernel_init_connect() function
880        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
881            fio_config = fio_config + """
882fixedbufs=1
883registerfiles=1
884#hipri=1
885"""
886        if num_jobs:
887            fio_config = fio_config + "numjobs=%s \n" % num_jobs
888        if self.cpus_allowed is not None:
889            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
890            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
891        fio_config = fio_config + filename_section
892
893        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
894        if hasattr(self, "num_cores"):
895            fio_config_filename += "_%sCPU" % self.num_cores
896        fio_config_filename += ".fio"
897
898        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
899        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
900        self.log_print("Created FIO Config:")
901        self.log_print(fio_config)
902
903        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
904
905    def set_cpu_frequency(self):
906        if self.cpu_frequency is not None:
907            try:
908                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
909                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
910                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
911            except Exception:
912                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
913                sys.exit()
914        else:
915            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
916
917    def run_fio(self, fio_config_file, run_num=None):
918        job_name, _ = os.path.splitext(fio_config_file)
919        self.log_print("Starting FIO run for job: %s" % job_name)
920        self.log_print("Using FIO: %s" % self.fio_bin)
921
922        if run_num:
923            for i in range(1, run_num + 1):
924                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
925                try:
926                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
927                                            "--output=%s" % output_filename, "--eta=never"], True)
928                    self.log_print(output)
929                except subprocess.CalledProcessError as e:
930                    self.log_print("ERROR: Fio process failed!")
931                    self.log_print(e.stdout)
932        else:
933            output_filename = job_name + "_" + self.name + ".json"
934            output = self.exec_cmd(["sudo", self.fio_bin,
935                                    fio_config_file, "--output-format=json",
936                                    "--output" % output_filename], True)
937            self.log_print(output)
938        self.log_print("FIO run finished. Results in: %s" % output_filename)
939
940    def sys_config(self):
941        self.log_print("====Kernel release:====")
942        self.log_print(self.exec_cmd(["uname", "-r"]))
943        self.log_print("====Kernel command line:====")
944        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
945        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
946        self.log_print("====sysctl conf:====")
947        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
948        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
949        self.log_print("====Cpu power info:====")
950        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
951
952
953class KernelTarget(Target):
954    def __init__(self, name, general_config, target_config):
955        super().__init__(name, general_config, target_config)
956        # Defaults
957        self.nvmet_bin = "nvmetcli"
958
959        if "nvmet_bin" in target_config:
960            self.nvmet_bin = target_config["nvmet_bin"]
961
962    def stop(self):
963        nvmet_command(self.nvmet_bin, "clear")
964
965    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
966
967        nvmet_cfg = {
968            "ports": [],
969            "hosts": [],
970            "subsystems": [],
971        }
972
973        # Split disks between NIC IP's
974        disks_per_ip = int(len(nvme_list) / len(address_list))
975        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
976
977        # Add remaining drives
978        for i, disk in enumerate(nvme_list[disks_per_ip * len(address_list):]):
979            disk_chunks[i].append(disk)
980
981        subsys_no = 1
982        port_no = 0
983        for ip, chunk in zip(address_list, disk_chunks):
984            for disk in chunk:
985                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
986                nvmet_cfg["subsystems"].append({
987                    "allowed_hosts": [],
988                    "attr": {
989                        "allow_any_host": "1",
990                        "serial": "SPDK00%s" % subsys_no,
991                        "version": "1.3"
992                    },
993                    "namespaces": [
994                        {
995                            "device": {
996                                "path": disk,
997                                "uuid": "%s" % uuid.uuid4()
998                            },
999                            "enable": 1,
1000                            "nsid": subsys_no
1001                        }
1002                    ],
1003                    "nqn": nqn
1004                })
1005
1006                nvmet_cfg["ports"].append({
1007                    "addr": {
1008                        "adrfam": "ipv4",
1009                        "traddr": ip,
1010                        "trsvcid": "%s" % (4420 + port_no),
1011                        "trtype": "%s" % self.transport
1012                    },
1013                    "portid": subsys_no,
1014                    "referrals": [],
1015                    "subsystems": [nqn]
1016                })
1017                subsys_no += 1
1018                port_no += 1
1019                self.subsystem_info_list.append([port_no, nqn, ip])
1020
1021        with open("kernel.conf", "w") as fh:
1022            fh.write(json.dumps(nvmet_cfg, indent=2))
1023
1024    def tgt_start(self):
1025        self.log_print("Configuring kernel NVMeOF Target")
1026
1027        if self.null_block:
1028            print("Configuring with null block device.")
1029            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1030            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
1031            self.subsys_no = len(null_blk_list)
1032        else:
1033            print("Configuring with NVMe drives.")
1034            nvme_list = get_nvme_devices()
1035            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1036            self.subsys_no = len(nvme_list)
1037
1038        nvmet_command(self.nvmet_bin, "clear")
1039        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1040
1041        if self.enable_adq:
1042            self.adq_configure_tc()
1043
1044        self.log_print("Done configuring kernel NVMeOF Target")
1045
1046
1047class SPDKTarget(Target):
1048    def __init__(self, name, general_config, target_config):
1049        super().__init__(name, general_config, target_config)
1050
1051        # Required fields
1052        self.core_mask = target_config["core_mask"]
1053        self.num_cores = self.get_num_cores(self.core_mask)
1054
1055        # Defaults
1056        self.dif_insert_strip = False
1057        self.null_block_dif_type = 0
1058        self.num_shared_buffers = 4096
1059        self.max_queue_depth = 128
1060        self.bpf_proc = None
1061        self.bpf_scripts = []
1062        self.enable_idxd = False
1063
1064        if "num_shared_buffers" in target_config:
1065            self.num_shared_buffers = target_config["num_shared_buffers"]
1066        if "max_queue_depth" in target_config:
1067            self.max_queue_depth = target_config["max_queue_depth"]
1068        if "null_block_dif_type" in target_config:
1069            self.null_block_dif_type = target_config["null_block_dif_type"]
1070        if "dif_insert_strip" in target_config:
1071            self.dif_insert_strip = target_config["dif_insert_strip"]
1072        if "bpf_scripts" in target_config:
1073            self.bpf_scripts = target_config["bpf_scripts"]
1074        if "idxd_settings" in target_config:
1075            self.enable_idxd = target_config["idxd_settings"]
1076
1077        self.log_print("====IDXD settings:====")
1078        self.log_print("IDXD enabled: %s" % (self.enable_idxd))
1079
1080    @staticmethod
1081    def get_num_cores(core_mask):
1082        if "0x" in core_mask:
1083            return bin(int(core_mask, 16)).count("1")
1084        else:
1085            num_cores = 0
1086            core_mask = core_mask.replace("[", "")
1087            core_mask = core_mask.replace("]", "")
1088            for i in core_mask.split(","):
1089                if "-" in i:
1090                    x, y = i.split("-")
1091                    num_cores += len(range(int(x), int(y))) + 1
1092                else:
1093                    num_cores += 1
1094            return num_cores
1095
1096    def spdk_tgt_configure(self):
1097        self.log_print("Configuring SPDK NVMeOF target via RPC")
1098
1099        if self.enable_adq:
1100            self.adq_configure_tc()
1101
1102        # Create transport layer
1103        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1104                                       num_shared_buffers=self.num_shared_buffers,
1105                                       max_queue_depth=self.max_queue_depth,
1106                                       dif_insert_or_strip=self.dif_insert_strip,
1107                                       sock_priority=self.adq_priority)
1108        self.log_print("SPDK NVMeOF transport layer:")
1109        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1110
1111        if self.null_block:
1112            self.spdk_tgt_add_nullblock(self.null_block)
1113            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1114        else:
1115            self.spdk_tgt_add_nvme_conf()
1116            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1117
1118        self.log_print("Done configuring SPDK NVMeOF Target")
1119
1120    def spdk_tgt_add_nullblock(self, null_block_count):
1121        md_size = 0
1122        block_size = 4096
1123        if self.null_block_dif_type != 0:
1124            md_size = 128
1125
1126        self.log_print("Adding null block bdevices to config via RPC")
1127        for i in range(null_block_count):
1128            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1129            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1130                                      dif_type=self.null_block_dif_type, md_size=md_size)
1131        self.log_print("SPDK Bdevs configuration:")
1132        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1133
1134    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1135        self.log_print("Adding NVMe bdevs to config via RPC")
1136
1137        bdfs = get_nvme_devices_bdf()
1138        bdfs = [b.replace(":", ".") for b in bdfs]
1139
1140        if req_num_disks:
1141            if req_num_disks > len(bdfs):
1142                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1143                sys.exit(1)
1144            else:
1145                bdfs = bdfs[0:req_num_disks]
1146
1147        for i, bdf in enumerate(bdfs):
1148            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1149
1150        self.log_print("SPDK Bdevs configuration:")
1151        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1152
1153    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1154        self.log_print("Adding subsystems to config")
1155        port = "4420"
1156        if not req_num_disks:
1157            req_num_disks = get_nvme_devices_count()
1158
1159        # Distribute bdevs between provided NICs
1160        num_disks = range(0, req_num_disks)
1161        if len(num_disks) == 1:
1162            disks_per_ip = 1
1163        else:
1164            disks_per_ip = int(len(num_disks) / len(ips))
1165        disk_chunks = [[*num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i]] for i in range(0, len(ips))]
1166
1167        # Add remaining drives
1168        for i, disk in enumerate(num_disks[disks_per_ip * len(ips):]):
1169            disk_chunks[i].append(disk)
1170
1171        # Create subsystems, add bdevs to namespaces, add listeners
1172        for ip, chunk in zip(ips, disk_chunks):
1173            for c in chunk:
1174                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1175                serial = "SPDK00%s" % c
1176                bdev_name = "Nvme%sn1" % c
1177                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1178                                               allow_any_host=True, max_namespaces=8)
1179                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1180
1181                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1182                                                     nqn=nqn,
1183                                                     trtype=self.transport,
1184                                                     traddr=ip,
1185                                                     trsvcid=port,
1186                                                     adrfam="ipv4")
1187
1188                self.subsystem_info_list.append([port, nqn, ip])
1189        self.log_print("SPDK NVMeOF subsystem configuration:")
1190        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1191
1192    def bpf_start(self):
1193        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1194        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1195        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1196        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1197
1198        with open(self.pid, "r") as fh:
1199            nvmf_pid = str(fh.readline())
1200
1201        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1202        self.log_print(cmd)
1203        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1204
1205    def tgt_start(self):
1206        if self.null_block:
1207            self.subsys_no = 1
1208        else:
1209            self.subsys_no = get_nvme_devices_count()
1210        self.log_print("Starting SPDK NVMeOF Target process")
1211        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1212        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1213        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1214
1215        with open(self.pid, "w") as fh:
1216            fh.write(str(proc.pid))
1217        self.nvmf_proc = proc
1218        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1219        self.log_print("Waiting for spdk to initialize...")
1220        while True:
1221            if os.path.exists("/var/tmp/spdk.sock"):
1222                break
1223            time.sleep(1)
1224        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1225
1226        if self.enable_zcopy:
1227            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1228                                           enable_zerocopy_send_server=True)
1229            self.log_print("Target socket options:")
1230            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1231
1232        if self.enable_adq:
1233            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1234            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1235                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1236
1237        if self.enable_idxd:
1238            rpc.idxd.idxd_scan_accel_engine(self.client, config_kernel_mode=None)
1239            self.log_print("Target IDXD accel engine enabled")
1240
1241        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1242        rpc.framework_start_init(self.client)
1243
1244        if self.bpf_scripts:
1245            self.bpf_start()
1246
1247        self.spdk_tgt_configure()
1248
1249    def stop(self):
1250        if self.bpf_proc:
1251            self.log_print("Stopping BPF Trace script")
1252            self.bpf_proc.terminate()
1253            self.bpf_proc.wait()
1254
1255        if hasattr(self, "nvmf_proc"):
1256            try:
1257                self.nvmf_proc.terminate()
1258                self.nvmf_proc.wait()
1259            except Exception as e:
1260                self.log_print(e)
1261                self.nvmf_proc.kill()
1262                self.nvmf_proc.communicate()
1263
1264
1265class KernelInitiator(Initiator):
1266    def __init__(self, name, general_config, initiator_config):
1267        super().__init__(name, general_config, initiator_config)
1268
1269        # Defaults
1270        self.extra_params = ""
1271        self.ioengine = "libaio"
1272
1273        if "extra_params" in initiator_config:
1274            self.extra_params = initiator_config["extra_params"]
1275
1276        if "kernel_engine" in initiator_config:
1277            self.ioengine = initiator_config["kernel_engine"]
1278            if "io_uring" in self.ioengine:
1279                self.extra_params = "--nr-poll-queues=8"
1280
1281    def get_connected_nvme_list(self):
1282        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1283        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1284                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1285        return nvme_list
1286
1287    def kernel_init_connect(self):
1288        self.log_print("Below connection attempts may result in error messages, this is expected!")
1289        for subsystem in self.subsystem_info_list:
1290            self.log_print("Trying to connect %s %s %s" % subsystem)
1291            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1292                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1293            time.sleep(2)
1294
1295        if "io_uring" in self.ioengine:
1296            self.log_print("Setting block layer settings for io_uring.")
1297
1298            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1299            #       apparently it's not possible for connected subsystems.
1300            #       Results in "error: Invalid argument"
1301            block_sysfs_settings = {
1302                "iostats": "0",
1303                "rq_affinity": "0",
1304                "nomerges": "2"
1305            }
1306
1307            for disk in self.get_connected_nvme_list():
1308                sysfs = os.path.join("/sys/block", disk, "queue")
1309                for k, v in block_sysfs_settings.items():
1310                    sysfs_opt_path = os.path.join(sysfs, k)
1311                    try:
1312                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1313                    except subprocess.CalledProcessError as e:
1314                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1315                    finally:
1316                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1317                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1318
1319    def kernel_init_disconnect(self):
1320        for subsystem in self.subsystem_info_list:
1321            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1322            time.sleep(1)
1323
1324    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1325        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1326
1327        filename_section = ""
1328        nvme_per_split = int(len(nvme_list) / len(threads))
1329        remainder = len(nvme_list) % len(threads)
1330        iterator = iter(nvme_list)
1331        result = []
1332        for i in range(len(threads)):
1333            result.append([])
1334            for _ in range(nvme_per_split):
1335                result[i].append(next(iterator))
1336                if remainder:
1337                    result[i].append(next(iterator))
1338                    remainder -= 1
1339        for i, r in enumerate(result):
1340            header = "[filename%s]" % i
1341            disks = "\n".join(["filename=%s" % x for x in r])
1342            job_section_qd = round((io_depth * len(r)) / num_jobs)
1343            if job_section_qd == 0:
1344                job_section_qd = 1
1345            iodepth = "iodepth=%s" % job_section_qd
1346            filename_section = "\n".join([filename_section, header, disks, iodepth])
1347
1348        return filename_section
1349
1350
1351class SPDKInitiator(Initiator):
1352    def __init__(self, name, general_config, initiator_config):
1353        super().__init__(name, general_config, initiator_config)
1354
1355        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1356            self.install_spdk()
1357
1358        # Required fields
1359        self.num_cores = initiator_config["num_cores"]
1360
1361        # Optional fields
1362        self.enable_data_digest = False
1363        if "enable_data_digest" in initiator_config:
1364            self.enable_data_digest = initiator_config["enable_data_digest"]
1365
1366    def install_spdk(self):
1367        self.log_print("Using fio binary %s" % self.fio_bin)
1368        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1369        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1370        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1371        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1372        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1373
1374        self.log_print("SPDK built")
1375        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1376
1377    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1378        bdev_cfg_section = {
1379            "subsystems": [
1380                {
1381                    "subsystem": "bdev",
1382                    "config": []
1383                }
1384            ]
1385        }
1386
1387        for i, subsys in enumerate(remote_subsystem_list):
1388            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1389            nvme_ctrl = {
1390                "method": "bdev_nvme_attach_controller",
1391                "params": {
1392                    "name": "Nvme{}".format(i),
1393                    "trtype": self.transport,
1394                    "traddr": sub_addr,
1395                    "trsvcid": sub_port,
1396                    "subnqn": sub_nqn,
1397                    "adrfam": "IPv4"
1398                }
1399            }
1400
1401            if self.enable_adq:
1402                nvme_ctrl["params"].update({"priority": "1"})
1403
1404            if self.enable_data_digest:
1405                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1406
1407            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1408
1409        return json.dumps(bdev_cfg_section, indent=2)
1410
1411    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1412        filename_section = ""
1413        if len(threads) >= len(subsystems):
1414            threads = range(0, len(subsystems))
1415        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1416        nvme_per_split = int(len(subsystems) / len(threads))
1417        remainder = len(subsystems) % len(threads)
1418        iterator = iter(filenames)
1419        result = []
1420        for i in range(len(threads)):
1421            result.append([])
1422            for _ in range(nvme_per_split):
1423                result[i].append(next(iterator))
1424            if remainder:
1425                result[i].append(next(iterator))
1426                remainder -= 1
1427        for i, r in enumerate(result):
1428            header = "[filename%s]" % i
1429            disks = "\n".join(["filename=%s" % x for x in r])
1430            job_section_qd = round((io_depth * len(r)) / num_jobs)
1431            if job_section_qd == 0:
1432                job_section_qd = 1
1433            iodepth = "iodepth=%s" % job_section_qd
1434            filename_section = "\n".join([filename_section, header, disks, iodepth])
1435
1436        return filename_section
1437
1438
1439if __name__ == "__main__":
1440    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1441    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1442
1443    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1444    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1445                        help='Configuration file.')
1446    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1447                        help='Results directory.')
1448    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1449                        help='CSV results filename.')
1450
1451    args = parser.parse_args()
1452
1453    print("Using config file: %s" % args.config)
1454    with open(args.config, "r") as config:
1455        data = json.load(config)
1456
1457    initiators = []
1458    fio_cases = []
1459
1460    general_config = data["general"]
1461    target_config = data["target"]
1462    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1463
1464    for k, v in data.items():
1465        if "target" in k:
1466            v.update({"results_dir": args.results})
1467            if data[k]["mode"] == "spdk":
1468                target_obj = SPDKTarget(k, data["general"], v)
1469            elif data[k]["mode"] == "kernel":
1470                target_obj = KernelTarget(k, data["general"], v)
1471        elif "initiator" in k:
1472            if data[k]["mode"] == "spdk":
1473                init_obj = SPDKInitiator(k, data["general"], v)
1474            elif data[k]["mode"] == "kernel":
1475                init_obj = KernelInitiator(k, data["general"], v)
1476            initiators.append(init_obj)
1477        elif "fio" in k:
1478            fio_workloads = itertools.product(data[k]["bs"],
1479                                              data[k]["qd"],
1480                                              data[k]["rw"])
1481
1482            fio_run_time = data[k]["run_time"]
1483            fio_ramp_time = data[k]["ramp_time"]
1484            fio_rw_mix_read = data[k]["rwmixread"]
1485            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1486            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1487
1488            fio_rate_iops = 0
1489            if "rate_iops" in data[k]:
1490                fio_rate_iops = data[k]["rate_iops"]
1491        else:
1492            continue
1493
1494    try:
1495        os.mkdir(args.results)
1496    except FileExistsError:
1497        pass
1498
1499    # TODO: This try block is definietly too large. Need to break this up into separate
1500    # logical blocks to reduce size.
1501    try:
1502        target_obj.tgt_start()
1503
1504        for i in initiators:
1505            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1506            if i.enable_adq:
1507                i.adq_configure_tc()
1508
1509        # Poor mans threading
1510        # Run FIO tests
1511        for block_size, io_depth, rw in fio_workloads:
1512            threads = []
1513            configs = []
1514            for i in initiators:
1515                if i.mode == "kernel":
1516                    i.kernel_init_connect()
1517
1518                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1519                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1520                configs.append(cfg)
1521
1522            for i, cfg in zip(initiators, configs):
1523                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1524                threads.append(t)
1525            if target_obj.enable_sar:
1526                sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1527                sar_file_name = ".".join([sar_file_name, "txt"])
1528                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1529                threads.append(t)
1530
1531            if target_obj.enable_pcm:
1532                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1533
1534                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1535                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1536                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1537
1538                threads.append(pcm_cpu_t)
1539                threads.append(pcm_mem_t)
1540                threads.append(pcm_pow_t)
1541
1542            if target_obj.enable_bandwidth:
1543                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1544                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1545                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1546                threads.append(t)
1547
1548            if target_obj.enable_dpdk_memory:
1549                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1550                threads.append(t)
1551
1552            if target_obj.enable_adq:
1553                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1554                threads.append(ethtool_thread)
1555
1556            for t in threads:
1557                t.start()
1558            for t in threads:
1559                t.join()
1560
1561            for i in initiators:
1562                if i.mode == "kernel":
1563                    i.kernel_init_disconnect()
1564                i.copy_result_files(args.results)
1565
1566        target_obj.restore_governor()
1567        target_obj.restore_tuned()
1568        target_obj.restore_services()
1569        target_obj.restore_sysctl()
1570        for i in initiators:
1571            i.restore_governor()
1572            i.restore_tuned()
1573            i.restore_services()
1574            i.restore_sysctl()
1575        target_obj.parse_results(args.results, args.csv_filename)
1576    finally:
1577        for i in initiators:
1578            try:
1579                i.stop()
1580            except Exception as err:
1581                pass
1582        target_obj.stop()
1583