xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 5fd9561f54daa8eff7f3bcb56c789655bca846b1)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    # pylint: disable=R0201
97    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
98        return ""
99
100    def configure_system(self):
101        self.configure_services()
102        self.configure_sysctl()
103        self.configure_tuned()
104        self.configure_cpu_governor()
105        self.configure_irq_affinity()
106
107    def configure_adq(self):
108        if self.mode == "kernel":
109            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
110            return
111        self.adq_load_modules()
112        self.adq_configure_nic()
113
114    def adq_load_modules(self):
115        self.log_print("Modprobing ADQ-related Linux modules...")
116        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
117        for module in adq_module_deps:
118            try:
119                self.exec_cmd(["sudo", "modprobe", module])
120                self.log_print("%s loaded!" % module)
121            except CalledProcessError as e:
122                self.log_print("ERROR: failed to load module %s" % module)
123                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
124
125    def adq_configure_tc(self):
126        self.log_print("Configuring ADQ Traffic classes and filters...")
127
128        if self.mode == "kernel":
129            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
130            return
131
132        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
133        num_queues_tc1 = self.num_cores
134        port_param = "dst_port" if isinstance(self, Target) else "src_port"
135        port = "4420"
136        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
137
138        for nic_ip in self.nic_ips:
139            nic_name = self.get_nic_name_by_ip(nic_ip)
140            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
141                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
142                                "queues", "%s@0" % num_queues_tc0,
143                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
144                                "hw", "1", "mode", "channel"]
145            self.log_print(" ".join(tc_qdisc_map_cmd))
146            self.exec_cmd(tc_qdisc_map_cmd)
147
148            time.sleep(5)
149            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
150            self.log_print(" ".join(tc_qdisc_ingress_cmd))
151            self.exec_cmd(tc_qdisc_ingress_cmd)
152
153            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
154                             "protocol", "ip", "ingress", "prio", "1", "flower",
155                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
156                             "skip_sw", "hw_tc", "1"]
157            self.log_print(" ".join(tc_filter_cmd))
158            self.exec_cmd(tc_filter_cmd)
159
160            # show tc configuration
161            self.log_print("Show tc configuration for %s NIC..." % nic_name)
162            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
163            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
164            self.log_print("%s" % tc_disk_out)
165            self.log_print("%s" % tc_filter_out)
166
167            # Ethtool coalesce settings must be applied after configuring traffic classes
168            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
169            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
170
171            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
172            xps_cmd = ["sudo", xps_script_path, nic_name]
173            self.log_print(xps_cmd)
174            self.exec_cmd(xps_cmd)
175
176    def reload_driver(self, driver):
177        try:
178            self.exec_cmd(["sudo", "rmmod", driver])
179            self.exec_cmd(["sudo", "modprobe", driver])
180        except CalledProcessError as e:
181            self.log_print("ERROR: failed to reload %s module!" % driver)
182            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
183
184    def adq_configure_nic(self):
185        self.log_print("Configuring NIC port settings for ADQ testing...")
186
187        # Reload the driver first, to make sure any previous settings are re-set.
188        self.reload_driver("ice")
189
190        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
191        for nic in nic_names:
192            self.log_print(nic)
193            try:
194                self.exec_cmd(["sudo", "ethtool", "-K", nic,
195                               "hw-tc-offload", "on"])  # Enable hardware TC offload
196                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
197                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
198                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
199                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
200                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
201                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
202                               "channel-pkt-inspect-optimize", "on"])
203            except CalledProcessError as e:
204                self.log_print("ERROR: failed to configure NIC port using ethtool!")
205                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
206                self.log_print("Please update your NIC driver and firmware versions and try again.")
207            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
208            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
209
210    def configure_services(self):
211        self.log_print("Configuring active services...")
212        svc_config = configparser.ConfigParser(strict=False)
213
214        # Below list is valid only for RHEL / Fedora systems and might not
215        # contain valid names for other distributions.
216        svc_target_state = {
217            "firewalld": "inactive",
218            "irqbalance": "inactive",
219            "lldpad.service": "inactive",
220            "lldpad.socket": "inactive"
221        }
222
223        for service in svc_target_state:
224            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
225            out = "\n".join(["[%s]" % service, out])
226            svc_config.read_string(out)
227
228            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
229                continue
230
231            service_state = svc_config[service]["ActiveState"]
232            self.log_print("Current state of %s service is %s" % (service, service_state))
233            self.svc_restore_dict.update({service: service_state})
234            if service_state != "inactive":
235                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
236                self.exec_cmd(["sudo", "systemctl", "stop", service])
237
238    def configure_sysctl(self):
239        self.log_print("Tuning sysctl settings...")
240
241        busy_read = 0
242        if self.enable_adq and self.mode == "spdk":
243            busy_read = 1
244
245        sysctl_opts = {
246            "net.core.busy_poll": 0,
247            "net.core.busy_read": busy_read,
248            "net.core.somaxconn": 4096,
249            "net.core.netdev_max_backlog": 8192,
250            "net.ipv4.tcp_max_syn_backlog": 16384,
251            "net.core.rmem_max": 268435456,
252            "net.core.wmem_max": 268435456,
253            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
254            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
255            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
256            "net.ipv4.route.flush": 1,
257            "vm.overcommit_memory": 1,
258        }
259
260        for opt, value in sysctl_opts.items():
261            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
262            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
263
264    def configure_tuned(self):
265        if not self.tuned_profile:
266            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
267            return
268
269        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
270        service = "tuned"
271        tuned_config = configparser.ConfigParser(strict=False)
272
273        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
274        out = "\n".join(["[%s]" % service, out])
275        tuned_config.read_string(out)
276        tuned_state = tuned_config[service]["ActiveState"]
277        self.svc_restore_dict.update({service: tuned_state})
278
279        if tuned_state != "inactive":
280            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
281            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
282
283            self.tuned_restore_dict = {
284                "profile": profile,
285                "mode": profile_mode
286            }
287
288        self.exec_cmd(["sudo", "systemctl", "start", service])
289        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
290        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
291
292    def configure_cpu_governor(self):
293        self.log_print("Setting CPU governor to performance...")
294
295        # This assumes that there is the same CPU scaling governor on each CPU
296        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
297        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
298
299    def configure_irq_affinity(self):
300        self.log_print("Setting NIC irq affinity for NICs...")
301
302        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
303        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
304        for nic in nic_names:
305            irq_cmd = ["sudo", irq_script_path, nic]
306            self.log_print(irq_cmd)
307            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
308
309    def restore_services(self):
310        self.log_print("Restoring services...")
311        for service, state in self.svc_restore_dict.items():
312            cmd = "stop" if state == "inactive" else "start"
313            self.exec_cmd(["sudo", "systemctl", cmd, service])
314
315    def restore_sysctl(self):
316        self.log_print("Restoring sysctl settings...")
317        for opt, value in self.sysctl_restore_dict.items():
318            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
319
320    def restore_tuned(self):
321        self.log_print("Restoring tuned-adm settings...")
322
323        if not self.tuned_restore_dict:
324            return
325
326        if self.tuned_restore_dict["mode"] == "auto":
327            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
328            self.log_print("Reverted tuned-adm to auto_profile.")
329        else:
330            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
331            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
332
333    def restore_governor(self):
334        self.log_print("Restoring CPU governor setting...")
335        if self.governor_restore:
336            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
337            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
338
339
340class Target(Server):
341    def __init__(self, name, general_config, target_config):
342        super().__init__(name, general_config, target_config)
343
344        # Defaults
345        self.enable_sar = False
346        self.sar_delay = 0
347        self.sar_interval = 0
348        self.sar_count = 0
349        self.enable_pcm = False
350        self.pcm_dir = ""
351        self.pcm_delay = 0
352        self.pcm_interval = 0
353        self.pcm_count = 0
354        self.enable_bandwidth = 0
355        self.bandwidth_count = 0
356        self.enable_dpdk_memory = False
357        self.dpdk_wait_time = 0
358        self.enable_zcopy = False
359        self.scheduler_name = "static"
360        self.null_block = 0
361        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
362        self.subsystem_info_list = []
363
364        if "null_block_devices" in target_config:
365            self.null_block = target_config["null_block_devices"]
366        if "sar_settings" in target_config:
367            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
368        if "pcm_settings" in target_config:
369            self.enable_pcm = True
370            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
371        if "enable_bandwidth" in target_config:
372            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
373        if "enable_dpdk_memory" in target_config:
374            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
375        if "scheduler_settings" in target_config:
376            self.scheduler_name = target_config["scheduler_settings"]
377        if "zcopy_settings" in target_config:
378            self.enable_zcopy = target_config["zcopy_settings"]
379        if "results_dir" in target_config:
380            self.results_dir = target_config["results_dir"]
381
382        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
383        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
384        self.set_local_nic_info(self.set_local_nic_info_helper())
385
386        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
387            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
388
389        self.configure_system()
390        if self.enable_adq:
391            self.configure_adq()
392        self.sys_config()
393
394    def set_local_nic_info_helper(self):
395        return json.loads(self.exec_cmd(["lshw", "-json"]))
396
397    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
398        stderr_opt = None
399        if stderr_redirect:
400            stderr_opt = subprocess.STDOUT
401        if change_dir:
402            old_cwd = os.getcwd()
403            os.chdir(change_dir)
404            self.log_print("Changing directory to %s" % change_dir)
405
406        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
407
408        if change_dir:
409            os.chdir(old_cwd)
410            self.log_print("Changing directory to %s" % old_cwd)
411        return out
412
413    def zip_spdk_sources(self, spdk_dir, dest_file):
414        self.log_print("Zipping SPDK source directory")
415        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
416        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
417            for file in files:
418                fh.write(os.path.relpath(os.path.join(root, file)))
419        fh.close()
420        self.log_print("Done zipping")
421
422    @staticmethod
423    def read_json_stats(file):
424        with open(file, "r") as json_data:
425            data = json.load(json_data)
426            job_pos = 0  # job_post = 0 because using aggregated results
427
428            # Check if latency is in nano or microseconds to choose correct dict key
429            def get_lat_unit(key_prefix, dict_section):
430                # key prefix - lat, clat or slat.
431                # dict section - portion of json containing latency bucket in question
432                # Return dict key to access the bucket and unit as string
433                for k, _ in dict_section.items():
434                    if k.startswith(key_prefix):
435                        return k, k.split("_")[1]
436
437            def get_clat_percentiles(clat_dict_leaf):
438                if "percentile" in clat_dict_leaf:
439                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
440                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
441                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
442                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
443
444                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
445                else:
446                    # Latest fio versions do not provide "percentile" results if no
447                    # measurements were done, so just return zeroes
448                    return [0, 0, 0, 0]
449
450            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
451            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
452            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
453            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
454            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
455            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
456            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
457            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
458                data["jobs"][job_pos]["read"][clat_key])
459
460            if "ns" in lat_unit:
461                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
462            if "ns" in clat_unit:
463                read_p99_lat = read_p99_lat / 1000
464                read_p99_9_lat = read_p99_9_lat / 1000
465                read_p99_99_lat = read_p99_99_lat / 1000
466                read_p99_999_lat = read_p99_999_lat / 1000
467
468            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
469            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
470            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
471            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
472            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
473            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
474            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
475            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
476                data["jobs"][job_pos]["write"][clat_key])
477
478            if "ns" in lat_unit:
479                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
480            if "ns" in clat_unit:
481                write_p99_lat = write_p99_lat / 1000
482                write_p99_9_lat = write_p99_9_lat / 1000
483                write_p99_99_lat = write_p99_99_lat / 1000
484                write_p99_999_lat = write_p99_999_lat / 1000
485
486        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
487                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
488                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
489                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
490
491    def parse_results(self, results_dir, csv_file):
492        files = os.listdir(results_dir)
493        fio_files = filter(lambda x: ".fio" in x, files)
494        json_files = [x for x in files if ".json" in x]
495
496        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
497                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
498                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
499                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
500
501        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
502                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
503
504        header_line = ",".join(["Name", *headers])
505        aggr_header_line = ",".join(["Name", *aggr_headers])
506
507        # Create empty results file
508        with open(os.path.join(results_dir, csv_file), "w") as fh:
509            fh.write(aggr_header_line + "\n")
510        rows = set()
511
512        for fio_config in fio_files:
513            self.log_print("Getting FIO stats for %s" % fio_config)
514            job_name, _ = os.path.splitext(fio_config)
515
516            # Look in the filename for rwmixread value. Function arguments do
517            # not have that information.
518            # TODO: Improve this function by directly using workload params instead
519            # of regexing through filenames.
520            if "read" in job_name:
521                rw_mixread = 1
522            elif "write" in job_name:
523                rw_mixread = 0
524            else:
525                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
526
527            # If "_CPU" exists in name - ignore it
528            # Initiators for the same job could have different num_cores parameter
529            job_name = re.sub(r"_\d+CPU", "", job_name)
530            job_result_files = [x for x in json_files if x.startswith(job_name)]
531            self.log_print("Matching result files for current fio config:")
532            for j in job_result_files:
533                self.log_print("\t %s" % j)
534
535            # There may have been more than 1 initiator used in test, need to check that
536            # Result files are created so that string after last "_" separator is server name
537            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
538            inits_avg_results = []
539            for i in inits_names:
540                self.log_print("\tGetting stats for initiator %s" % i)
541                # There may have been more than 1 test run for this job, calculate average results for initiator
542                i_results = [x for x in job_result_files if i in x]
543                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
544
545                separate_stats = []
546                for r in i_results:
547                    try:
548                        stats = self.read_json_stats(os.path.join(results_dir, r))
549                        separate_stats.append(stats)
550                        self.log_print(stats)
551                    except JSONDecodeError:
552                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
553
554                init_results = [sum(x) for x in zip(*separate_stats)]
555                init_results = [x / len(separate_stats) for x in init_results]
556                inits_avg_results.append(init_results)
557
558                self.log_print("\tAverage results for initiator %s" % i)
559                self.log_print(init_results)
560                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
561                    fh.write(header_line + "\n")
562                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
563
564            # Sum results of all initiators running this FIO job.
565            # Latency results are an average of latencies from accros all initiators.
566            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
567            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
568            for key in inits_avg_results:
569                if "lat" in key:
570                    inits_avg_results[key] /= len(inits_names)
571
572            # Aggregate separate read/write values into common labels
573            # Take rw_mixread into consideration for mixed read/write workloads.
574            aggregate_results = OrderedDict()
575            for h in aggr_headers:
576                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
577                if "lat" in h:
578                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
579                else:
580                    _ = read_stat + write_stat
581                aggregate_results[h] = "{0:.3f}".format(_)
582
583            rows.add(",".join([job_name, *aggregate_results.values()]))
584
585        # Save results to file
586        for row in rows:
587            with open(os.path.join(results_dir, csv_file), "a") as fh:
588                fh.write(row + "\n")
589        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
590
591    def measure_sar(self, results_dir, sar_file_prefix):
592        cpu_number = os.cpu_count()
593        sar_idle_sum = 0
594        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
595        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
596
597        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
598        time.sleep(self.sar_delay)
599        self.log_print("Starting SAR measurements")
600
601        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
602        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
603            for line in out.split("\n"):
604                if "Average" in line:
605                    if "CPU" in line:
606                        self.log_print("Summary CPU utilization from SAR:")
607                        self.log_print(line)
608                    elif "all" in line:
609                        self.log_print(line)
610                    else:
611                        sar_idle_sum += float(line.split()[7])
612            fh.write(out)
613        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
614
615        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
616            f.write("%0.2f" % sar_cpu_usage)
617
618    def ethtool_after_fio_ramp(self, fio_ramp_time):
619        time.sleep(fio_ramp_time//2)
620        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
621        for nic in nic_names:
622            self.log_print(nic)
623            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
624                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
625
626    def measure_pcm_memory(self, results_dir, pcm_file_name):
627        time.sleep(self.pcm_delay)
628        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
629        pcm_memory = subprocess.Popen(cmd)
630        time.sleep(self.pcm_count)
631        pcm_memory.terminate()
632
633    def measure_pcm(self, results_dir, pcm_file_name):
634        time.sleep(self.pcm_delay)
635        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
636        subprocess.run(cmd)
637        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
638        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
639        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
640        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
641        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
642
643    def measure_pcm_power(self, results_dir, pcm_power_file_name):
644        time.sleep(self.pcm_delay)
645        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
646        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
647            fh.write(out)
648
649    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
650        self.log_print("INFO: starting network bandwidth measure")
651        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
652                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
653
654    def measure_dpdk_memory(self, results_dir):
655        self.log_print("INFO: waiting to generate DPDK memory usage")
656        time.sleep(self.dpdk_wait_time)
657        self.log_print("INFO: generating DPDK memory usage")
658        rpc.env.env_dpdk_get_mem_stats
659        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
660
661    def sys_config(self):
662        self.log_print("====Kernel release:====")
663        self.log_print(os.uname().release)
664        self.log_print("====Kernel command line:====")
665        with open('/proc/cmdline') as f:
666            cmdline = f.readlines()
667            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
668        self.log_print("====sysctl conf:====")
669        with open('/etc/sysctl.conf') as f:
670            sysctl = f.readlines()
671            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
672        self.log_print("====Cpu power info:====")
673        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
674        self.log_print("====zcopy settings:====")
675        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
676        self.log_print("====Scheduler settings:====")
677        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
678
679
680class Initiator(Server):
681    def __init__(self, name, general_config, initiator_config):
682        super().__init__(name, general_config, initiator_config)
683
684        # Required fields
685        self.ip = initiator_config["ip"]
686        self.target_nic_ips = initiator_config["target_nic_ips"]
687
688        # Defaults
689        self.cpus_allowed = None
690        self.cpus_allowed_policy = "shared"
691        self.spdk_dir = "/tmp/spdk"
692        self.fio_bin = "/usr/src/fio/fio"
693        self.nvmecli_bin = "nvme"
694        self.cpu_frequency = None
695        self.subsystem_info_list = []
696
697        if "spdk_dir" in initiator_config:
698            self.spdk_dir = initiator_config["spdk_dir"]
699        if "fio_bin" in initiator_config:
700            self.fio_bin = initiator_config["fio_bin"]
701        if "nvmecli_bin" in initiator_config:
702            self.nvmecli_bin = initiator_config["nvmecli_bin"]
703        if "cpus_allowed" in initiator_config:
704            self.cpus_allowed = initiator_config["cpus_allowed"]
705        if "cpus_allowed_policy" in initiator_config:
706            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
707        if "cpu_frequency" in initiator_config:
708            self.cpu_frequency = initiator_config["cpu_frequency"]
709        if os.getenv('SPDK_WORKSPACE'):
710            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
711
712        self.ssh_connection = paramiko.SSHClient()
713        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
714        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
715        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
716        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
717        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
718
719        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
720            self.copy_spdk("/tmp/spdk.zip")
721        self.set_local_nic_info(self.set_local_nic_info_helper())
722        self.set_cpu_frequency()
723        self.configure_system()
724        if self.enable_adq:
725            self.configure_adq()
726        self.sys_config()
727
728    def set_local_nic_info_helper(self):
729        return json.loads(self.exec_cmd(["lshw", "-json"]))
730
731    def stop(self):
732        self.ssh_connection.close()
733
734    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
735        if change_dir:
736            cmd = ["cd", change_dir, ";", *cmd]
737
738        # In case one of the command elements contains whitespace and is not
739        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
740        # when sending to remote system.
741        for i, c in enumerate(cmd):
742            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
743                cmd[i] = '"%s"' % c
744        cmd = " ".join(cmd)
745
746        # Redirect stderr to stdout thanks using get_pty option if needed
747        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
748        out = stdout.read().decode(encoding="utf-8")
749
750        # Check the return code
751        rc = stdout.channel.recv_exit_status()
752        if rc:
753            raise CalledProcessError(int(rc), cmd, out)
754
755        return out
756
757    def put_file(self, local, remote_dest):
758        ftp = self.ssh_connection.open_sftp()
759        ftp.put(local, remote_dest)
760        ftp.close()
761
762    def get_file(self, remote, local_dest):
763        ftp = self.ssh_connection.open_sftp()
764        ftp.get(remote, local_dest)
765        ftp.close()
766
767    def copy_spdk(self, local_spdk_zip):
768        self.log_print("Copying SPDK sources to initiator %s" % self.name)
769        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
770        self.log_print("Copied sources zip from target")
771        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
772        self.log_print("Sources unpacked")
773
774    def copy_result_files(self, dest_dir):
775        self.log_print("Copying results")
776
777        if not os.path.exists(dest_dir):
778            os.mkdir(dest_dir)
779
780        # Get list of result files from initiator and copy them back to target
781        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
782
783        for file in file_list:
784            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
785                          os.path.join(dest_dir, file))
786        self.log_print("Done copying results")
787
788    def discover_subsystems(self, address_list, subsys_no):
789        num_nvmes = range(0, subsys_no)
790        nvme_discover_output = ""
791        for ip, subsys_no in itertools.product(address_list, num_nvmes):
792            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
793            nvme_discover_cmd = ["sudo",
794                                 "%s" % self.nvmecli_bin,
795                                 "discover", "-t", "%s" % self.transport,
796                                 "-s", "%s" % (4420 + subsys_no),
797                                 "-a", "%s" % ip]
798
799            try:
800                stdout = self.exec_cmd(nvme_discover_cmd)
801                if stdout:
802                    nvme_discover_output = nvme_discover_output + stdout
803            except CalledProcessError:
804                # Do nothing. In case of discovering remote subsystems of kernel target
805                # we expect "nvme discover" to fail a bunch of times because we basically
806                # scan ports.
807                pass
808
809        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
810                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
811                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
812                                nvme_discover_output)  # from nvme discovery output
813        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
814        subsystems = list(set(subsystems))
815        subsystems.sort(key=lambda x: x[1])
816        self.log_print("Found matching subsystems on target side:")
817        for s in subsystems:
818            self.log_print(s)
819        self.subsystem_info_list = subsystems
820
821    def gen_fio_filename_conf(self, *args, **kwargs):
822        # Logic implemented in SPDKInitiator and KernelInitiator classes
823        pass
824
825    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
826        fio_conf_template = """
827[global]
828ioengine={ioengine}
829{spdk_conf}
830thread=1
831group_reporting=1
832direct=1
833percentile_list=50:90:99:99.5:99.9:99.99:99.999
834
835norandommap=1
836rw={rw}
837rwmixread={rwmixread}
838bs={block_size}
839time_based=1
840ramp_time={ramp_time}
841runtime={run_time}
842rate_iops={rate_iops}
843"""
844        if "spdk" in self.mode:
845            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
846            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
847            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
848            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
849        else:
850            ioengine = self.ioengine
851            spdk_conf = ""
852            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
853                                 "|", "awk", "'{print $1}'"])
854            subsystems = [x for x in out.split("\n") if "nvme" in x]
855
856        if self.cpus_allowed is not None:
857            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
858            cpus_num = 0
859            cpus = self.cpus_allowed.split(",")
860            for cpu in cpus:
861                if "-" in cpu:
862                    a, b = cpu.split("-")
863                    a = int(a)
864                    b = int(b)
865                    cpus_num += len(range(a, b))
866                else:
867                    cpus_num += 1
868            self.num_cores = cpus_num
869            threads = range(0, self.num_cores)
870        elif hasattr(self, 'num_cores'):
871            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
872            threads = range(0, int(self.num_cores))
873        else:
874            self.num_cores = len(subsystems)
875            threads = range(0, len(subsystems))
876
877        if "spdk" in self.mode:
878            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
879        else:
880            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
881
882        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
883                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
884                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
885
886        # TODO: hipri disabled for now, as it causes fio errors:
887        # io_u error on file /dev/nvme2n1: Operation not supported
888        # See comment in KernelInitiator class, kernel_init_connect() function
889        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
890            fio_config = fio_config + """
891fixedbufs=1
892registerfiles=1
893#hipri=1
894"""
895        if num_jobs:
896            fio_config = fio_config + "numjobs=%s \n" % num_jobs
897        if self.cpus_allowed is not None:
898            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
899            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
900        fio_config = fio_config + filename_section
901
902        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
903        if hasattr(self, "num_cores"):
904            fio_config_filename += "_%sCPU" % self.num_cores
905        fio_config_filename += ".fio"
906
907        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
908        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
909        self.log_print("Created FIO Config:")
910        self.log_print(fio_config)
911
912        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
913
914    def set_cpu_frequency(self):
915        if self.cpu_frequency is not None:
916            try:
917                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
918                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
919                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
920            except Exception:
921                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
922                sys.exit()
923        else:
924            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
925
926    def run_fio(self, fio_config_file, run_num=None):
927        job_name, _ = os.path.splitext(fio_config_file)
928        self.log_print("Starting FIO run for job: %s" % job_name)
929        self.log_print("Using FIO: %s" % self.fio_bin)
930
931        if run_num:
932            for i in range(1, run_num + 1):
933                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
934                try:
935                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
936                                            "--output=%s" % output_filename, "--eta=never"], True)
937                    self.log_print(output)
938                except subprocess.CalledProcessError as e:
939                    self.log_print("ERROR: Fio process failed!")
940                    self.log_print(e.stdout)
941        else:
942            output_filename = job_name + "_" + self.name + ".json"
943            output = self.exec_cmd(["sudo", self.fio_bin,
944                                    fio_config_file, "--output-format=json",
945                                    "--output" % output_filename], True)
946            self.log_print(output)
947        self.log_print("FIO run finished. Results in: %s" % output_filename)
948
949    def sys_config(self):
950        self.log_print("====Kernel release:====")
951        self.log_print(self.exec_cmd(["uname", "-r"]))
952        self.log_print("====Kernel command line:====")
953        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
954        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
955        self.log_print("====sysctl conf:====")
956        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
957        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
958        self.log_print("====Cpu power info:====")
959        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
960
961
962class KernelTarget(Target):
963    def __init__(self, name, general_config, target_config):
964        super().__init__(name, general_config, target_config)
965        # Defaults
966        self.nvmet_bin = "nvmetcli"
967
968        if "nvmet_bin" in target_config:
969            self.nvmet_bin = target_config["nvmet_bin"]
970
971    def stop(self):
972        nvmet_command(self.nvmet_bin, "clear")
973
974    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
975
976        nvmet_cfg = {
977            "ports": [],
978            "hosts": [],
979            "subsystems": [],
980        }
981
982        # Split disks between NIC IP's
983        disks_per_ip = int(len(nvme_list) / len(address_list))
984        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
985
986        # Add remaining drives
987        for i, disk in enumerate(nvme_list[disks_per_ip * len(address_list):]):
988            disk_chunks[i].append(disk)
989
990        subsys_no = 1
991        port_no = 0
992        for ip, chunk in zip(address_list, disk_chunks):
993            for disk in chunk:
994                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
995                nvmet_cfg["subsystems"].append({
996                    "allowed_hosts": [],
997                    "attr": {
998                        "allow_any_host": "1",
999                        "serial": "SPDK00%s" % subsys_no,
1000                        "version": "1.3"
1001                    },
1002                    "namespaces": [
1003                        {
1004                            "device": {
1005                                "path": disk,
1006                                "uuid": "%s" % uuid.uuid4()
1007                            },
1008                            "enable": 1,
1009                            "nsid": subsys_no
1010                        }
1011                    ],
1012                    "nqn": nqn
1013                })
1014
1015                nvmet_cfg["ports"].append({
1016                    "addr": {
1017                        "adrfam": "ipv4",
1018                        "traddr": ip,
1019                        "trsvcid": "%s" % (4420 + port_no),
1020                        "trtype": "%s" % self.transport
1021                    },
1022                    "portid": subsys_no,
1023                    "referrals": [],
1024                    "subsystems": [nqn]
1025                })
1026                subsys_no += 1
1027                port_no += 1
1028                self.subsystem_info_list.append([port_no, nqn, ip])
1029
1030        with open("kernel.conf", "w") as fh:
1031            fh.write(json.dumps(nvmet_cfg, indent=2))
1032
1033    def tgt_start(self):
1034        self.log_print("Configuring kernel NVMeOF Target")
1035
1036        if self.null_block:
1037            print("Configuring with null block device.")
1038            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1039            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
1040            self.subsys_no = len(null_blk_list)
1041        else:
1042            print("Configuring with NVMe drives.")
1043            nvme_list = get_nvme_devices()
1044            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1045            self.subsys_no = len(nvme_list)
1046
1047        nvmet_command(self.nvmet_bin, "clear")
1048        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1049
1050        if self.enable_adq:
1051            self.adq_configure_tc()
1052
1053        self.log_print("Done configuring kernel NVMeOF Target")
1054
1055
1056class SPDKTarget(Target):
1057    def __init__(self, name, general_config, target_config):
1058        super().__init__(name, general_config, target_config)
1059
1060        # Required fields
1061        self.core_mask = target_config["core_mask"]
1062        self.num_cores = self.get_num_cores(self.core_mask)
1063
1064        # Defaults
1065        self.dif_insert_strip = False
1066        self.null_block_dif_type = 0
1067        self.num_shared_buffers = 4096
1068        self.max_queue_depth = 128
1069        self.bpf_proc = None
1070        self.bpf_scripts = []
1071        self.enable_idxd = False
1072
1073        if "num_shared_buffers" in target_config:
1074            self.num_shared_buffers = target_config["num_shared_buffers"]
1075        if "max_queue_depth" in target_config:
1076            self.max_queue_depth = target_config["max_queue_depth"]
1077        if "null_block_dif_type" in target_config:
1078            self.null_block_dif_type = target_config["null_block_dif_type"]
1079        if "dif_insert_strip" in target_config:
1080            self.dif_insert_strip = target_config["dif_insert_strip"]
1081        if "bpf_scripts" in target_config:
1082            self.bpf_scripts = target_config["bpf_scripts"]
1083        if "idxd_settings" in target_config:
1084            self.enable_idxd = target_config["idxd_settings"]
1085
1086        self.log_print("====IDXD settings:====")
1087        self.log_print("IDXD enabled: %s" % (self.enable_idxd))
1088
1089    @staticmethod
1090    def get_num_cores(core_mask):
1091        if "0x" in core_mask:
1092            return bin(int(core_mask, 16)).count("1")
1093        else:
1094            num_cores = 0
1095            core_mask = core_mask.replace("[", "")
1096            core_mask = core_mask.replace("]", "")
1097            for i in core_mask.split(","):
1098                if "-" in i:
1099                    x, y = i.split("-")
1100                    num_cores += len(range(int(x), int(y))) + 1
1101                else:
1102                    num_cores += 1
1103            return num_cores
1104
1105    def spdk_tgt_configure(self):
1106        self.log_print("Configuring SPDK NVMeOF target via RPC")
1107
1108        if self.enable_adq:
1109            self.adq_configure_tc()
1110
1111        # Create transport layer
1112        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1113                                       num_shared_buffers=self.num_shared_buffers,
1114                                       max_queue_depth=self.max_queue_depth,
1115                                       dif_insert_or_strip=self.dif_insert_strip,
1116                                       sock_priority=self.adq_priority)
1117        self.log_print("SPDK NVMeOF transport layer:")
1118        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1119
1120        if self.null_block:
1121            self.spdk_tgt_add_nullblock(self.null_block)
1122            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1123        else:
1124            self.spdk_tgt_add_nvme_conf()
1125            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1126
1127        self.log_print("Done configuring SPDK NVMeOF Target")
1128
1129    def spdk_tgt_add_nullblock(self, null_block_count):
1130        md_size = 0
1131        block_size = 4096
1132        if self.null_block_dif_type != 0:
1133            md_size = 128
1134
1135        self.log_print("Adding null block bdevices to config via RPC")
1136        for i in range(null_block_count):
1137            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1138            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1139                                      dif_type=self.null_block_dif_type, md_size=md_size)
1140        self.log_print("SPDK Bdevs configuration:")
1141        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1142
1143    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1144        self.log_print("Adding NVMe bdevs to config via RPC")
1145
1146        bdfs = get_nvme_devices_bdf()
1147        bdfs = [b.replace(":", ".") for b in bdfs]
1148
1149        if req_num_disks:
1150            if req_num_disks > len(bdfs):
1151                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1152                sys.exit(1)
1153            else:
1154                bdfs = bdfs[0:req_num_disks]
1155
1156        for i, bdf in enumerate(bdfs):
1157            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1158
1159        self.log_print("SPDK Bdevs configuration:")
1160        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1161
1162    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1163        self.log_print("Adding subsystems to config")
1164        port = "4420"
1165        if not req_num_disks:
1166            req_num_disks = get_nvme_devices_count()
1167
1168        # Distribute bdevs between provided NICs
1169        num_disks = range(0, req_num_disks)
1170        if len(num_disks) == 1:
1171            disks_per_ip = 1
1172        else:
1173            disks_per_ip = int(len(num_disks) / len(ips))
1174        disk_chunks = [[*num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i]] for i in range(0, len(ips))]
1175
1176        # Add remaining drives
1177        for i, disk in enumerate(num_disks[disks_per_ip * len(ips):]):
1178            disk_chunks[i].append(disk)
1179
1180        # Create subsystems, add bdevs to namespaces, add listeners
1181        for ip, chunk in zip(ips, disk_chunks):
1182            for c in chunk:
1183                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1184                serial = "SPDK00%s" % c
1185                bdev_name = "Nvme%sn1" % c
1186                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1187                                               allow_any_host=True, max_namespaces=8)
1188                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1189
1190                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1191                                                     nqn=nqn,
1192                                                     trtype=self.transport,
1193                                                     traddr=ip,
1194                                                     trsvcid=port,
1195                                                     adrfam="ipv4")
1196
1197                self.subsystem_info_list.append([port, nqn, ip])
1198        self.log_print("SPDK NVMeOF subsystem configuration:")
1199        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1200
1201    def bpf_start(self):
1202        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1203        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1204        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1205        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1206
1207        with open(self.pid, "r") as fh:
1208            nvmf_pid = str(fh.readline())
1209
1210        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1211        self.log_print(cmd)
1212        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1213
1214    def tgt_start(self):
1215        if self.null_block:
1216            self.subsys_no = 1
1217        else:
1218            self.subsys_no = get_nvme_devices_count()
1219        self.log_print("Starting SPDK NVMeOF Target process")
1220        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1221        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1222        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1223
1224        with open(self.pid, "w") as fh:
1225            fh.write(str(proc.pid))
1226        self.nvmf_proc = proc
1227        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1228        self.log_print("Waiting for spdk to initialize...")
1229        while True:
1230            if os.path.exists("/var/tmp/spdk.sock"):
1231                break
1232            time.sleep(1)
1233        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1234
1235        if self.enable_zcopy:
1236            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1237                                           enable_zerocopy_send_server=True)
1238            self.log_print("Target socket options:")
1239            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1240
1241        if self.enable_adq:
1242            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1243            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1244                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1245
1246        if self.enable_idxd:
1247            rpc.idxd.idxd_scan_accel_engine(self.client, config_kernel_mode=None)
1248            self.log_print("Target IDXD accel engine enabled")
1249
1250        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1251        rpc.framework_start_init(self.client)
1252
1253        if self.bpf_scripts:
1254            self.bpf_start()
1255
1256        self.spdk_tgt_configure()
1257
1258    def stop(self):
1259        if self.bpf_proc:
1260            self.log_print("Stopping BPF Trace script")
1261            self.bpf_proc.terminate()
1262            self.bpf_proc.wait()
1263
1264        if hasattr(self, "nvmf_proc"):
1265            try:
1266                self.nvmf_proc.terminate()
1267                self.nvmf_proc.wait()
1268            except Exception as e:
1269                self.log_print(e)
1270                self.nvmf_proc.kill()
1271                self.nvmf_proc.communicate()
1272
1273
1274class KernelInitiator(Initiator):
1275    def __init__(self, name, general_config, initiator_config):
1276        super().__init__(name, general_config, initiator_config)
1277
1278        # Defaults
1279        self.extra_params = ""
1280        self.ioengine = "libaio"
1281
1282        if "extra_params" in initiator_config:
1283            self.extra_params = initiator_config["extra_params"]
1284
1285        if "kernel_engine" in initiator_config:
1286            self.ioengine = initiator_config["kernel_engine"]
1287            if "io_uring" in self.ioengine:
1288                self.extra_params = "--nr-poll-queues=8"
1289
1290    def get_connected_nvme_list(self):
1291        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1292        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1293                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1294        return nvme_list
1295
1296    def kernel_init_connect(self):
1297        self.log_print("Below connection attempts may result in error messages, this is expected!")
1298        for subsystem in self.subsystem_info_list:
1299            self.log_print("Trying to connect %s %s %s" % subsystem)
1300            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1301                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1302            time.sleep(2)
1303
1304        if "io_uring" in self.ioengine:
1305            self.log_print("Setting block layer settings for io_uring.")
1306
1307            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1308            #       apparently it's not possible for connected subsystems.
1309            #       Results in "error: Invalid argument"
1310            block_sysfs_settings = {
1311                "iostats": "0",
1312                "rq_affinity": "0",
1313                "nomerges": "2"
1314            }
1315
1316            for disk in self.get_connected_nvme_list():
1317                sysfs = os.path.join("/sys/block", disk, "queue")
1318                for k, v in block_sysfs_settings.items():
1319                    sysfs_opt_path = os.path.join(sysfs, k)
1320                    try:
1321                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1322                    except subprocess.CalledProcessError as e:
1323                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1324                    finally:
1325                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1326                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1327
1328    def kernel_init_disconnect(self):
1329        for subsystem in self.subsystem_info_list:
1330            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1331            time.sleep(1)
1332
1333    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1334        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1335
1336        filename_section = ""
1337        nvme_per_split = int(len(nvme_list) / len(threads))
1338        remainder = len(nvme_list) % len(threads)
1339        iterator = iter(nvme_list)
1340        result = []
1341        for i in range(len(threads)):
1342            result.append([])
1343            for _ in range(nvme_per_split):
1344                result[i].append(next(iterator))
1345                if remainder:
1346                    result[i].append(next(iterator))
1347                    remainder -= 1
1348        for i, r in enumerate(result):
1349            header = "[filename%s]" % i
1350            disks = "\n".join(["filename=%s" % x for x in r])
1351            job_section_qd = round((io_depth * len(r)) / num_jobs)
1352            if job_section_qd == 0:
1353                job_section_qd = 1
1354            iodepth = "iodepth=%s" % job_section_qd
1355            filename_section = "\n".join([filename_section, header, disks, iodepth])
1356
1357        return filename_section
1358
1359
1360class SPDKInitiator(Initiator):
1361    def __init__(self, name, general_config, initiator_config):
1362        super().__init__(name, general_config, initiator_config)
1363
1364        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1365            self.install_spdk()
1366
1367        # Required fields
1368        self.num_cores = initiator_config["num_cores"]
1369
1370        # Optional fields
1371        self.enable_data_digest = False
1372        if "enable_data_digest" in initiator_config:
1373            self.enable_data_digest = initiator_config["enable_data_digest"]
1374
1375    def install_spdk(self):
1376        self.log_print("Using fio binary %s" % self.fio_bin)
1377        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1378        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1379        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1380        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1381        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1382
1383        self.log_print("SPDK built")
1384        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1385
1386    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1387        bdev_cfg_section = {
1388            "subsystems": [
1389                {
1390                    "subsystem": "bdev",
1391                    "config": []
1392                }
1393            ]
1394        }
1395
1396        for i, subsys in enumerate(remote_subsystem_list):
1397            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1398            nvme_ctrl = {
1399                "method": "bdev_nvme_attach_controller",
1400                "params": {
1401                    "name": "Nvme{}".format(i),
1402                    "trtype": self.transport,
1403                    "traddr": sub_addr,
1404                    "trsvcid": sub_port,
1405                    "subnqn": sub_nqn,
1406                    "adrfam": "IPv4"
1407                }
1408            }
1409
1410            if self.enable_adq:
1411                nvme_ctrl["params"].update({"priority": "1"})
1412
1413            if self.enable_data_digest:
1414                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1415
1416            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1417
1418        return json.dumps(bdev_cfg_section, indent=2)
1419
1420    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1421        filename_section = ""
1422        if len(threads) >= len(subsystems):
1423            threads = range(0, len(subsystems))
1424        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1425        nvme_per_split = int(len(subsystems) / len(threads))
1426        remainder = len(subsystems) % len(threads)
1427        iterator = iter(filenames)
1428        result = []
1429        for i in range(len(threads)):
1430            result.append([])
1431            for _ in range(nvme_per_split):
1432                result[i].append(next(iterator))
1433            if remainder:
1434                result[i].append(next(iterator))
1435                remainder -= 1
1436        for i, r in enumerate(result):
1437            header = "[filename%s]" % i
1438            disks = "\n".join(["filename=%s" % x for x in r])
1439            job_section_qd = round((io_depth * len(r)) / num_jobs)
1440            if job_section_qd == 0:
1441                job_section_qd = 1
1442            iodepth = "iodepth=%s" % job_section_qd
1443            filename_section = "\n".join([filename_section, header, disks, iodepth])
1444
1445        return filename_section
1446
1447
1448if __name__ == "__main__":
1449    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1450    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1451
1452    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1453    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1454                        help='Configuration file.')
1455    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1456                        help='Results directory.')
1457    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1458                        help='CSV results filename.')
1459
1460    args = parser.parse_args()
1461
1462    print("Using config file: %s" % args.config)
1463    with open(args.config, "r") as config:
1464        data = json.load(config)
1465
1466    initiators = []
1467    fio_cases = []
1468
1469    general_config = data["general"]
1470    target_config = data["target"]
1471    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1472
1473    for k, v in data.items():
1474        if "target" in k:
1475            v.update({"results_dir": args.results})
1476            if data[k]["mode"] == "spdk":
1477                target_obj = SPDKTarget(k, data["general"], v)
1478            elif data[k]["mode"] == "kernel":
1479                target_obj = KernelTarget(k, data["general"], v)
1480        elif "initiator" in k:
1481            if data[k]["mode"] == "spdk":
1482                init_obj = SPDKInitiator(k, data["general"], v)
1483            elif data[k]["mode"] == "kernel":
1484                init_obj = KernelInitiator(k, data["general"], v)
1485            initiators.append(init_obj)
1486        elif "fio" in k:
1487            fio_workloads = itertools.product(data[k]["bs"],
1488                                              data[k]["qd"],
1489                                              data[k]["rw"])
1490
1491            fio_run_time = data[k]["run_time"]
1492            fio_ramp_time = data[k]["ramp_time"]
1493            fio_rw_mix_read = data[k]["rwmixread"]
1494            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1495            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1496
1497            fio_rate_iops = 0
1498            if "rate_iops" in data[k]:
1499                fio_rate_iops = data[k]["rate_iops"]
1500        else:
1501            continue
1502
1503    try:
1504        os.mkdir(args.results)
1505    except FileExistsError:
1506        pass
1507
1508    # TODO: This try block is definietly too large. Need to break this up into separate
1509    # logical blocks to reduce size.
1510    try:
1511        target_obj.tgt_start()
1512
1513        for i in initiators:
1514            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1515            if i.enable_adq:
1516                i.adq_configure_tc()
1517
1518        # Poor mans threading
1519        # Run FIO tests
1520        for block_size, io_depth, rw in fio_workloads:
1521            threads = []
1522            configs = []
1523            for i in initiators:
1524                if i.mode == "kernel":
1525                    i.kernel_init_connect()
1526
1527                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1528                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1529                configs.append(cfg)
1530
1531            for i, cfg in zip(initiators, configs):
1532                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1533                threads.append(t)
1534            if target_obj.enable_sar:
1535                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1536                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1537                threads.append(t)
1538
1539            if target_obj.enable_pcm:
1540                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1541
1542                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1543                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1544                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1545
1546                threads.append(pcm_cpu_t)
1547                threads.append(pcm_mem_t)
1548                threads.append(pcm_pow_t)
1549
1550            if target_obj.enable_bandwidth:
1551                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1552                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1553                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1554                threads.append(t)
1555
1556            if target_obj.enable_dpdk_memory:
1557                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1558                threads.append(t)
1559
1560            if target_obj.enable_adq:
1561                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1562                threads.append(ethtool_thread)
1563
1564            for t in threads:
1565                t.start()
1566            for t in threads:
1567                t.join()
1568
1569            for i in initiators:
1570                if i.mode == "kernel":
1571                    i.kernel_init_disconnect()
1572                i.copy_result_files(args.results)
1573
1574        target_obj.restore_governor()
1575        target_obj.restore_tuned()
1576        target_obj.restore_services()
1577        target_obj.restore_sysctl()
1578        if target_obj.enable_adq:
1579            target_obj.reload_driver("ice")
1580        for i in initiators:
1581            i.restore_governor()
1582            i.restore_tuned()
1583            i.restore_services()
1584            i.restore_sysctl()
1585            if i.enable_adq:
1586                i.reload_driver("ice")
1587        target_obj.parse_results(args.results, args.csv_filename)
1588    finally:
1589        for i in initiators:
1590            try:
1591                i.stop()
1592            except Exception as err:
1593                pass
1594        target_obj.stop()
1595