xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 307b8c112ffd90a26d53dd15fad67bd9038ef526)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    def get_nic_numa_node(self, nic_name):
97        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
98
99    def get_numa_cpu_map(self):
100        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
101        numa_cpu_json_map = {}
102
103        for cpu in numa_cpu_json_obj["cpus"]:
104            cpu_num = int(cpu["cpu"])
105            numa_node = int(cpu["node"])
106            numa_cpu_json_map.setdefault(numa_node, [])
107            numa_cpu_json_map[numa_node].append(cpu_num)
108
109        return numa_cpu_json_map
110
111    # pylint: disable=R0201
112    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
113        return ""
114
115    def configure_system(self):
116        self.load_drivers()
117        self.configure_services()
118        self.configure_sysctl()
119        self.configure_tuned()
120        self.configure_cpu_governor()
121        self.configure_irq_affinity()
122
123    def load_drivers(self):
124        self.log_print("Loading drivers")
125        self.exec_cmd(["sudo", "modprobe", "-a",
126                       "nvme-%s" % self.transport,
127                       "nvmet-%s" % self.transport])
128        if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block:
129            self.exec_cmd(["sudo", "modprobe", "null_blk",
130                           "nr_devices=%s" % self.null_block])
131
132    def configure_adq(self):
133        if self.mode == "kernel":
134            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
135            return
136        self.adq_load_modules()
137        self.adq_configure_nic()
138
139    def adq_load_modules(self):
140        self.log_print("Modprobing ADQ-related Linux modules...")
141        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
142        for module in adq_module_deps:
143            try:
144                self.exec_cmd(["sudo", "modprobe", module])
145                self.log_print("%s loaded!" % module)
146            except CalledProcessError as e:
147                self.log_print("ERROR: failed to load module %s" % module)
148                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
149
150    def adq_configure_tc(self):
151        self.log_print("Configuring ADQ Traffic classes and filters...")
152
153        if self.mode == "kernel":
154            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
155            return
156
157        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
158        num_queues_tc1 = self.num_cores
159        port_param = "dst_port" if isinstance(self, Target) else "src_port"
160        port = "4420"
161        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
162
163        for nic_ip in self.nic_ips:
164            nic_name = self.get_nic_name_by_ip(nic_ip)
165            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
166                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
167                                "queues", "%s@0" % num_queues_tc0,
168                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
169                                "hw", "1", "mode", "channel"]
170            self.log_print(" ".join(tc_qdisc_map_cmd))
171            self.exec_cmd(tc_qdisc_map_cmd)
172
173            time.sleep(5)
174            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
175            self.log_print(" ".join(tc_qdisc_ingress_cmd))
176            self.exec_cmd(tc_qdisc_ingress_cmd)
177
178            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
179                             "protocol", "ip", "ingress", "prio", "1", "flower",
180                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
181                             "skip_sw", "hw_tc", "1"]
182            self.log_print(" ".join(tc_filter_cmd))
183            self.exec_cmd(tc_filter_cmd)
184
185            # show tc configuration
186            self.log_print("Show tc configuration for %s NIC..." % nic_name)
187            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
188            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
189            self.log_print("%s" % tc_disk_out)
190            self.log_print("%s" % tc_filter_out)
191
192            # Ethtool coalesce settings must be applied after configuring traffic classes
193            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
194            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
195
196            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
197            xps_cmd = ["sudo", xps_script_path, nic_name]
198            self.log_print(xps_cmd)
199            self.exec_cmd(xps_cmd)
200
201    def reload_driver(self, driver):
202        try:
203            self.exec_cmd(["sudo", "rmmod", driver])
204            self.exec_cmd(["sudo", "modprobe", driver])
205        except CalledProcessError as e:
206            self.log_print("ERROR: failed to reload %s module!" % driver)
207            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
208
209    def adq_configure_nic(self):
210        self.log_print("Configuring NIC port settings for ADQ testing...")
211
212        # Reload the driver first, to make sure any previous settings are re-set.
213        self.reload_driver("ice")
214
215        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
216        for nic in nic_names:
217            self.log_print(nic)
218            try:
219                self.exec_cmd(["sudo", "ethtool", "-K", nic,
220                               "hw-tc-offload", "on"])  # Enable hardware TC offload
221                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
222                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
223                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
224                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
225                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
226                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
227                               "channel-pkt-inspect-optimize", "on"])
228            except CalledProcessError as e:
229                self.log_print("ERROR: failed to configure NIC port using ethtool!")
230                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
231                self.log_print("Please update your NIC driver and firmware versions and try again.")
232            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
233            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
234
235    def configure_services(self):
236        self.log_print("Configuring active services...")
237        svc_config = configparser.ConfigParser(strict=False)
238
239        # Below list is valid only for RHEL / Fedora systems and might not
240        # contain valid names for other distributions.
241        svc_target_state = {
242            "firewalld": "inactive",
243            "irqbalance": "inactive",
244            "lldpad.service": "inactive",
245            "lldpad.socket": "inactive"
246        }
247
248        for service in svc_target_state:
249            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
250            out = "\n".join(["[%s]" % service, out])
251            svc_config.read_string(out)
252
253            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
254                continue
255
256            service_state = svc_config[service]["ActiveState"]
257            self.log_print("Current state of %s service is %s" % (service, service_state))
258            self.svc_restore_dict.update({service: service_state})
259            if service_state != "inactive":
260                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
261                self.exec_cmd(["sudo", "systemctl", "stop", service])
262
263    def configure_sysctl(self):
264        self.log_print("Tuning sysctl settings...")
265
266        busy_read = 0
267        if self.enable_adq and self.mode == "spdk":
268            busy_read = 1
269
270        sysctl_opts = {
271            "net.core.busy_poll": 0,
272            "net.core.busy_read": busy_read,
273            "net.core.somaxconn": 4096,
274            "net.core.netdev_max_backlog": 8192,
275            "net.ipv4.tcp_max_syn_backlog": 16384,
276            "net.core.rmem_max": 268435456,
277            "net.core.wmem_max": 268435456,
278            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
279            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
280            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
281            "net.ipv4.route.flush": 1,
282            "vm.overcommit_memory": 1,
283        }
284
285        for opt, value in sysctl_opts.items():
286            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
287            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
288
289    def configure_tuned(self):
290        if not self.tuned_profile:
291            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
292            return
293
294        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
295        service = "tuned"
296        tuned_config = configparser.ConfigParser(strict=False)
297
298        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
299        out = "\n".join(["[%s]" % service, out])
300        tuned_config.read_string(out)
301        tuned_state = tuned_config[service]["ActiveState"]
302        self.svc_restore_dict.update({service: tuned_state})
303
304        if tuned_state != "inactive":
305            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
306            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
307
308            self.tuned_restore_dict = {
309                "profile": profile,
310                "mode": profile_mode
311            }
312
313        self.exec_cmd(["sudo", "systemctl", "start", service])
314        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
315        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
316
317    def configure_cpu_governor(self):
318        self.log_print("Setting CPU governor to performance...")
319
320        # This assumes that there is the same CPU scaling governor on each CPU
321        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
322        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
323
324    def configure_irq_affinity(self):
325        self.log_print("Setting NIC irq affinity for NICs...")
326
327        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
328        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
329        for nic in nic_names:
330            irq_cmd = ["sudo", irq_script_path, nic]
331            self.log_print(irq_cmd)
332            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
333
334    def restore_services(self):
335        self.log_print("Restoring services...")
336        for service, state in self.svc_restore_dict.items():
337            cmd = "stop" if state == "inactive" else "start"
338            self.exec_cmd(["sudo", "systemctl", cmd, service])
339
340    def restore_sysctl(self):
341        self.log_print("Restoring sysctl settings...")
342        for opt, value in self.sysctl_restore_dict.items():
343            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
344
345    def restore_tuned(self):
346        self.log_print("Restoring tuned-adm settings...")
347
348        if not self.tuned_restore_dict:
349            return
350
351        if self.tuned_restore_dict["mode"] == "auto":
352            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
353            self.log_print("Reverted tuned-adm to auto_profile.")
354        else:
355            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
356            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
357
358    def restore_governor(self):
359        self.log_print("Restoring CPU governor setting...")
360        if self.governor_restore:
361            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
362            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
363
364
365class Target(Server):
366    def __init__(self, name, general_config, target_config):
367        super().__init__(name, general_config, target_config)
368
369        # Defaults
370        self.enable_sar = False
371        self.sar_delay = 0
372        self.sar_interval = 0
373        self.sar_count = 0
374        self.enable_pcm = False
375        self.pcm_dir = ""
376        self.pcm_delay = 0
377        self.pcm_interval = 0
378        self.pcm_count = 0
379        self.enable_bandwidth = 0
380        self.bandwidth_count = 0
381        self.enable_dpdk_memory = False
382        self.dpdk_wait_time = 0
383        self.enable_zcopy = False
384        self.scheduler_name = "static"
385        self.null_block = 0
386        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
387        self.subsystem_info_list = []
388        self.initiator_info = []
389        self.enable_pm = True
390
391        if "null_block_devices" in target_config:
392            self.null_block = target_config["null_block_devices"]
393        if "sar_settings" in target_config:
394            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
395        if "pcm_settings" in target_config:
396            self.enable_pcm = True
397            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
398        if "enable_bandwidth" in target_config:
399            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
400        if "enable_dpdk_memory" in target_config:
401            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
402        if "scheduler_settings" in target_config:
403            self.scheduler_name = target_config["scheduler_settings"]
404        if "zcopy_settings" in target_config:
405            self.enable_zcopy = target_config["zcopy_settings"]
406        if "results_dir" in target_config:
407            self.results_dir = target_config["results_dir"]
408        if "enable_pm" in target_config:
409            self.enable_pm = target_config["enable_pm"]
410
411        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
412        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
413        self.set_local_nic_info(self.set_local_nic_info_helper())
414
415        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
416            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
417
418        self.configure_system()
419        if self.enable_adq:
420            self.configure_adq()
421        self.sys_config()
422
423    def set_local_nic_info_helper(self):
424        return json.loads(self.exec_cmd(["lshw", "-json"]))
425
426    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
427        stderr_opt = None
428        if stderr_redirect:
429            stderr_opt = subprocess.STDOUT
430        if change_dir:
431            old_cwd = os.getcwd()
432            os.chdir(change_dir)
433            self.log_print("Changing directory to %s" % change_dir)
434
435        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
436
437        if change_dir:
438            os.chdir(old_cwd)
439            self.log_print("Changing directory to %s" % old_cwd)
440        return out
441
442    def zip_spdk_sources(self, spdk_dir, dest_file):
443        self.log_print("Zipping SPDK source directory")
444        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
445        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
446            for file in files:
447                fh.write(os.path.relpath(os.path.join(root, file)))
448        fh.close()
449        self.log_print("Done zipping")
450
451    @staticmethod
452    def _chunks(input_list, chunks_no):
453        div, rem = divmod(len(input_list), chunks_no)
454        for i in range(chunks_no):
455            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
456            yield input_list[si:si + (div + 1 if i < rem else div)]
457
458    def spread_bdevs(self, req_disks):
459        # Spread available block devices indexes:
460        # - evenly across available initiator systems
461        # - evenly across available NIC interfaces for
462        #   each initiator
463        # Not NUMA aware.
464        ip_bdev_map = []
465        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
466
467        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
468            self.initiator_info[i]["bdev_range"] = init_chunk
469            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
470            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
471                for c in nic_chunk:
472                    ip_bdev_map.append((ip, c))
473        return ip_bdev_map
474
475    @staticmethod
476    def read_json_stats(file):
477        with open(file, "r") as json_data:
478            data = json.load(json_data)
479            job_pos = 0  # job_post = 0 because using aggregated results
480
481            # Check if latency is in nano or microseconds to choose correct dict key
482            def get_lat_unit(key_prefix, dict_section):
483                # key prefix - lat, clat or slat.
484                # dict section - portion of json containing latency bucket in question
485                # Return dict key to access the bucket and unit as string
486                for k, _ in dict_section.items():
487                    if k.startswith(key_prefix):
488                        return k, k.split("_")[1]
489
490            def get_clat_percentiles(clat_dict_leaf):
491                if "percentile" in clat_dict_leaf:
492                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
493                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
494                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
495                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
496
497                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
498                else:
499                    # Latest fio versions do not provide "percentile" results if no
500                    # measurements were done, so just return zeroes
501                    return [0, 0, 0, 0]
502
503            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
504            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
505            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
506            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
507            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
508            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
509            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
510            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
511                data["jobs"][job_pos]["read"][clat_key])
512
513            if "ns" in lat_unit:
514                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
515            if "ns" in clat_unit:
516                read_p99_lat = read_p99_lat / 1000
517                read_p99_9_lat = read_p99_9_lat / 1000
518                read_p99_99_lat = read_p99_99_lat / 1000
519                read_p99_999_lat = read_p99_999_lat / 1000
520
521            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
522            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
523            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
524            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
525            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
526            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
527            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
528            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
529                data["jobs"][job_pos]["write"][clat_key])
530
531            if "ns" in lat_unit:
532                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
533            if "ns" in clat_unit:
534                write_p99_lat = write_p99_lat / 1000
535                write_p99_9_lat = write_p99_9_lat / 1000
536                write_p99_99_lat = write_p99_99_lat / 1000
537                write_p99_999_lat = write_p99_999_lat / 1000
538
539        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
540                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
541                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
542                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
543
544    def parse_results(self, results_dir, csv_file):
545        files = os.listdir(results_dir)
546        fio_files = filter(lambda x: ".fio" in x, files)
547        json_files = [x for x in files if ".json" in x]
548
549        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
550                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
551                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
552                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
553
554        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
555                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
556
557        header_line = ",".join(["Name", *headers])
558        aggr_header_line = ",".join(["Name", *aggr_headers])
559
560        # Create empty results file
561        with open(os.path.join(results_dir, csv_file), "w") as fh:
562            fh.write(aggr_header_line + "\n")
563        rows = set()
564
565        for fio_config in fio_files:
566            self.log_print("Getting FIO stats for %s" % fio_config)
567            job_name, _ = os.path.splitext(fio_config)
568
569            # Look in the filename for rwmixread value. Function arguments do
570            # not have that information.
571            # TODO: Improve this function by directly using workload params instead
572            # of regexing through filenames.
573            if "read" in job_name:
574                rw_mixread = 1
575            elif "write" in job_name:
576                rw_mixread = 0
577            else:
578                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
579
580            # If "_CPU" exists in name - ignore it
581            # Initiators for the same job could have different num_cores parameter
582            job_name = re.sub(r"_\d+CPU", "", job_name)
583            job_result_files = [x for x in json_files if x.startswith(job_name)]
584            self.log_print("Matching result files for current fio config:")
585            for j in job_result_files:
586                self.log_print("\t %s" % j)
587
588            # There may have been more than 1 initiator used in test, need to check that
589            # Result files are created so that string after last "_" separator is server name
590            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
591            inits_avg_results = []
592            for i in inits_names:
593                self.log_print("\tGetting stats for initiator %s" % i)
594                # There may have been more than 1 test run for this job, calculate average results for initiator
595                i_results = [x for x in job_result_files if i in x]
596                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
597
598                separate_stats = []
599                for r in i_results:
600                    try:
601                        stats = self.read_json_stats(os.path.join(results_dir, r))
602                        separate_stats.append(stats)
603                        self.log_print(stats)
604                    except JSONDecodeError:
605                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
606
607                init_results = [sum(x) for x in zip(*separate_stats)]
608                init_results = [x / len(separate_stats) for x in init_results]
609                inits_avg_results.append(init_results)
610
611                self.log_print("\tAverage results for initiator %s" % i)
612                self.log_print(init_results)
613                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
614                    fh.write(header_line + "\n")
615                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
616
617            # Sum results of all initiators running this FIO job.
618            # Latency results are an average of latencies from accros all initiators.
619            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
620            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
621            for key in inits_avg_results:
622                if "lat" in key:
623                    inits_avg_results[key] /= len(inits_names)
624
625            # Aggregate separate read/write values into common labels
626            # Take rw_mixread into consideration for mixed read/write workloads.
627            aggregate_results = OrderedDict()
628            for h in aggr_headers:
629                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
630                if "lat" in h:
631                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
632                else:
633                    _ = read_stat + write_stat
634                aggregate_results[h] = "{0:.3f}".format(_)
635
636            rows.add(",".join([job_name, *aggregate_results.values()]))
637
638        # Save results to file
639        for row in rows:
640            with open(os.path.join(results_dir, csv_file), "a") as fh:
641                fh.write(row + "\n")
642        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
643
644    def measure_sar(self, results_dir, sar_file_prefix):
645        cpu_number = os.cpu_count()
646        sar_idle_sum = 0
647        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
648        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
649
650        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
651        time.sleep(self.sar_delay)
652        self.log_print("Starting SAR measurements")
653
654        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
655        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
656            for line in out.split("\n"):
657                if "Average" in line:
658                    if "CPU" in line:
659                        self.log_print("Summary CPU utilization from SAR:")
660                        self.log_print(line)
661                    elif "all" in line:
662                        self.log_print(line)
663                    else:
664                        sar_idle_sum += float(line.split()[7])
665            fh.write(out)
666        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
667
668        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
669            f.write("%0.2f" % sar_cpu_usage)
670
671    def measure_power(self, results_dir, prefix, script_full_dir):
672        return subprocess.Popen(["%s/../pm/collect-bmc-pm" % script_full_dir, "-d", results_dir, "-l", "-p", prefix, "-x"])
673
674    def ethtool_after_fio_ramp(self, fio_ramp_time):
675        time.sleep(fio_ramp_time//2)
676        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
677        for nic in nic_names:
678            self.log_print(nic)
679            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
680                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
681
682    def measure_pcm_memory(self, results_dir, pcm_file_name):
683        time.sleep(self.pcm_delay)
684        cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
685        pcm_memory = subprocess.Popen(cmd)
686        time.sleep(self.pcm_count)
687        pcm_memory.terminate()
688
689    def measure_pcm(self, results_dir, pcm_file_name):
690        time.sleep(self.pcm_delay)
691        cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count,
692               "-csv=%s/%s" % (results_dir, pcm_file_name)]
693        subprocess.run(cmd)
694        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
695        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
696        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
697        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
698        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
699
700    def measure_pcm_power(self, results_dir, pcm_power_file_name):
701        time.sleep(self.pcm_delay)
702        out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
703        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
704            fh.write(out)
705
706    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
707        self.log_print("INFO: starting network bandwidth measure")
708        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
709                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
710
711    def measure_dpdk_memory(self, results_dir):
712        self.log_print("INFO: waiting to generate DPDK memory usage")
713        time.sleep(self.dpdk_wait_time)
714        self.log_print("INFO: generating DPDK memory usage")
715        rpc.env.env_dpdk_get_mem_stats
716        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
717
718    def sys_config(self):
719        self.log_print("====Kernel release:====")
720        self.log_print(os.uname().release)
721        self.log_print("====Kernel command line:====")
722        with open('/proc/cmdline') as f:
723            cmdline = f.readlines()
724            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
725        self.log_print("====sysctl conf:====")
726        with open('/etc/sysctl.conf') as f:
727            sysctl = f.readlines()
728            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
729        self.log_print("====Cpu power info:====")
730        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
731        self.log_print("====zcopy settings:====")
732        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
733        self.log_print("====Scheduler settings:====")
734        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
735
736
737class Initiator(Server):
738    def __init__(self, name, general_config, initiator_config):
739        super().__init__(name, general_config, initiator_config)
740
741        # Required fields
742        self.ip = initiator_config["ip"]
743        self.target_nic_ips = initiator_config["target_nic_ips"]
744
745        # Defaults
746        self.cpus_allowed = None
747        self.cpus_allowed_policy = "shared"
748        self.spdk_dir = "/tmp/spdk"
749        self.fio_bin = "/usr/src/fio/fio"
750        self.nvmecli_bin = "nvme"
751        self.cpu_frequency = None
752        self.subsystem_info_list = []
753
754        if "spdk_dir" in initiator_config:
755            self.spdk_dir = initiator_config["spdk_dir"]
756        if "fio_bin" in initiator_config:
757            self.fio_bin = initiator_config["fio_bin"]
758        if "nvmecli_bin" in initiator_config:
759            self.nvmecli_bin = initiator_config["nvmecli_bin"]
760        if "cpus_allowed" in initiator_config:
761            self.cpus_allowed = initiator_config["cpus_allowed"]
762        if "cpus_allowed_policy" in initiator_config:
763            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
764        if "cpu_frequency" in initiator_config:
765            self.cpu_frequency = initiator_config["cpu_frequency"]
766        if os.getenv('SPDK_WORKSPACE'):
767            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
768
769        self.ssh_connection = paramiko.SSHClient()
770        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
771        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
772        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
773        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
774        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
775
776        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
777            self.copy_spdk("/tmp/spdk.zip")
778        self.set_local_nic_info(self.set_local_nic_info_helper())
779        self.set_cpu_frequency()
780        self.configure_system()
781        if self.enable_adq:
782            self.configure_adq()
783        self.sys_config()
784
785    def set_local_nic_info_helper(self):
786        return json.loads(self.exec_cmd(["lshw", "-json"]))
787
788    def stop(self):
789        self.ssh_connection.close()
790
791    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
792        if change_dir:
793            cmd = ["cd", change_dir, ";", *cmd]
794
795        # In case one of the command elements contains whitespace and is not
796        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
797        # when sending to remote system.
798        for i, c in enumerate(cmd):
799            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
800                cmd[i] = '"%s"' % c
801        cmd = " ".join(cmd)
802
803        # Redirect stderr to stdout thanks using get_pty option if needed
804        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
805        out = stdout.read().decode(encoding="utf-8")
806
807        # Check the return code
808        rc = stdout.channel.recv_exit_status()
809        if rc:
810            raise CalledProcessError(int(rc), cmd, out)
811
812        return out
813
814    def put_file(self, local, remote_dest):
815        ftp = self.ssh_connection.open_sftp()
816        ftp.put(local, remote_dest)
817        ftp.close()
818
819    def get_file(self, remote, local_dest):
820        ftp = self.ssh_connection.open_sftp()
821        ftp.get(remote, local_dest)
822        ftp.close()
823
824    def copy_spdk(self, local_spdk_zip):
825        self.log_print("Copying SPDK sources to initiator %s" % self.name)
826        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
827        self.log_print("Copied sources zip from target")
828        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
829        self.log_print("Sources unpacked")
830
831    def copy_result_files(self, dest_dir):
832        self.log_print("Copying results")
833
834        if not os.path.exists(dest_dir):
835            os.mkdir(dest_dir)
836
837        # Get list of result files from initiator and copy them back to target
838        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
839
840        for file in file_list:
841            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
842                          os.path.join(dest_dir, file))
843        self.log_print("Done copying results")
844
845    def discover_subsystems(self, address_list, subsys_no):
846        num_nvmes = range(0, subsys_no)
847        nvme_discover_output = ""
848        for ip, subsys_no in itertools.product(address_list, num_nvmes):
849            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
850            nvme_discover_cmd = ["sudo",
851                                 "%s" % self.nvmecli_bin,
852                                 "discover", "-t", "%s" % self.transport,
853                                 "-s", "%s" % (4420 + subsys_no),
854                                 "-a", "%s" % ip]
855
856            try:
857                stdout = self.exec_cmd(nvme_discover_cmd)
858                if stdout:
859                    nvme_discover_output = nvme_discover_output + stdout
860            except CalledProcessError:
861                # Do nothing. In case of discovering remote subsystems of kernel target
862                # we expect "nvme discover" to fail a bunch of times because we basically
863                # scan ports.
864                pass
865
866        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
867                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
868                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
869                                nvme_discover_output)  # from nvme discovery output
870        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
871        subsystems = filter(lambda x: "discovery" not in x[1], subsystems)
872        subsystems = list(set(subsystems))
873        subsystems.sort(key=lambda x: x[1])
874        self.log_print("Found matching subsystems on target side:")
875        for s in subsystems:
876            self.log_print(s)
877        self.subsystem_info_list = subsystems
878
879    def gen_fio_filename_conf(self, *args, **kwargs):
880        # Logic implemented in SPDKInitiator and KernelInitiator classes
881        pass
882
883    def get_route_nic_numa(self, remote_nvme_ip):
884        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
885        local_nvme_nic = local_nvme_nic[0]["dev"]
886        return self.get_nic_numa_node(local_nvme_nic)
887
888    @staticmethod
889    def gen_fio_offset_section(offset_inc, num_jobs):
890        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
891        return "\n".join(["size=%s%%" % offset_inc,
892                          "offset=0%",
893                          "offset_increment=%s%%" % offset_inc])
894
895    def gen_fio_numa_section(self, fio_filenames_list):
896        numa_stats = {}
897        for nvme in fio_filenames_list:
898            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
899            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
900
901        # Use the most common NUMA node for this chunk to allocate memory and CPUs
902        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
903
904        return "\n".join(["numa_cpu_nodes=%s" % section_local_numa,
905                          "numa_mem_policy=prefer:%s" % section_local_numa])
906
907    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
908                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
909                       offset=False, offset_inc=0):
910        fio_conf_template = """
911[global]
912ioengine={ioengine}
913{spdk_conf}
914thread=1
915group_reporting=1
916direct=1
917percentile_list=50:90:99:99.5:99.9:99.99:99.999
918
919norandommap=1
920rw={rw}
921rwmixread={rwmixread}
922bs={block_size}
923time_based=1
924ramp_time={ramp_time}
925runtime={run_time}
926rate_iops={rate_iops}
927"""
928        if "spdk" in self.mode:
929            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
930            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
931            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
932            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
933        else:
934            ioengine = self.ioengine
935            spdk_conf = ""
936            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
937                                 "|", "awk", "'{print $1}'"])
938            subsystems = [x for x in out.split("\n") if "nvme" in x]
939
940        if self.cpus_allowed is not None:
941            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
942            cpus_num = 0
943            cpus = self.cpus_allowed.split(",")
944            for cpu in cpus:
945                if "-" in cpu:
946                    a, b = cpu.split("-")
947                    a = int(a)
948                    b = int(b)
949                    cpus_num += len(range(a, b))
950                else:
951                    cpus_num += 1
952            self.num_cores = cpus_num
953            threads = range(0, self.num_cores)
954        elif hasattr(self, 'num_cores'):
955            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
956            threads = range(0, int(self.num_cores))
957        else:
958            self.num_cores = len(subsystems)
959            threads = range(0, len(subsystems))
960
961        if "spdk" in self.mode:
962            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
963                                                          offset, offset_inc)
964        else:
965            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs,
966                                                          offset, offset_inc)
967
968        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
969                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
970                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
971
972        # TODO: hipri disabled for now, as it causes fio errors:
973        # io_u error on file /dev/nvme2n1: Operation not supported
974        # See comment in KernelInitiator class, kernel_init_connect() function
975        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
976            fio_config = fio_config + """
977fixedbufs=1
978registerfiles=1
979#hipri=1
980"""
981        if num_jobs:
982            fio_config = fio_config + "numjobs=%s \n" % num_jobs
983        if self.cpus_allowed is not None:
984            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
985            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
986        fio_config = fio_config + filename_section
987
988        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
989        if hasattr(self, "num_cores"):
990            fio_config_filename += "_%sCPU" % self.num_cores
991        fio_config_filename += ".fio"
992
993        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
994        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
995        self.log_print("Created FIO Config:")
996        self.log_print(fio_config)
997
998        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
999
1000    def set_cpu_frequency(self):
1001        if self.cpu_frequency is not None:
1002            try:
1003                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
1004                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
1005                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
1006            except Exception:
1007                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
1008                sys.exit()
1009        else:
1010            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
1011
1012    def run_fio(self, fio_config_file, run_num=None):
1013        job_name, _ = os.path.splitext(fio_config_file)
1014        self.log_print("Starting FIO run for job: %s" % job_name)
1015        self.log_print("Using FIO: %s" % self.fio_bin)
1016
1017        if run_num:
1018            for i in range(1, run_num + 1):
1019                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
1020                try:
1021                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
1022                                            "--output=%s" % output_filename, "--eta=never"], True)
1023                    self.log_print(output)
1024                except subprocess.CalledProcessError as e:
1025                    self.log_print("ERROR: Fio process failed!")
1026                    self.log_print(e.stdout)
1027        else:
1028            output_filename = job_name + "_" + self.name + ".json"
1029            output = self.exec_cmd(["sudo", self.fio_bin,
1030                                    fio_config_file, "--output-format=json",
1031                                    "--output" % output_filename], True)
1032            self.log_print(output)
1033        self.log_print("FIO run finished. Results in: %s" % output_filename)
1034
1035    def sys_config(self):
1036        self.log_print("====Kernel release:====")
1037        self.log_print(self.exec_cmd(["uname", "-r"]))
1038        self.log_print("====Kernel command line:====")
1039        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1040        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1041        self.log_print("====sysctl conf:====")
1042        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
1043        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1044        self.log_print("====Cpu power info:====")
1045        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
1046
1047
1048class KernelTarget(Target):
1049    def __init__(self, name, general_config, target_config):
1050        super().__init__(name, general_config, target_config)
1051        # Defaults
1052        self.nvmet_bin = "nvmetcli"
1053
1054        if "nvmet_bin" in target_config:
1055            self.nvmet_bin = target_config["nvmet_bin"]
1056
1057    def stop(self):
1058        nvmet_command(self.nvmet_bin, "clear")
1059
1060    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1061
1062        nvmet_cfg = {
1063            "ports": [],
1064            "hosts": [],
1065            "subsystems": [],
1066        }
1067
1068        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1069            port = str(4420 + bdev_num)
1070            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1071            serial = "SPDK00%s" % bdev_num
1072            bdev_name = nvme_list[bdev_num]
1073
1074            nvmet_cfg["subsystems"].append({
1075                "allowed_hosts": [],
1076                "attr": {
1077                    "allow_any_host": "1",
1078                    "serial": serial,
1079                    "version": "1.3"
1080                },
1081                "namespaces": [
1082                    {
1083                        "device": {
1084                            "path": bdev_name,
1085                            "uuid": "%s" % uuid.uuid4()
1086                        },
1087                        "enable": 1,
1088                        "nsid": port
1089                    }
1090                ],
1091                "nqn": nqn
1092            })
1093
1094            nvmet_cfg["ports"].append({
1095                "addr": {
1096                    "adrfam": "ipv4",
1097                    "traddr": ip,
1098                    "trsvcid": port,
1099                    "trtype": self.transport
1100                },
1101                "portid": bdev_num,
1102                "referrals": [],
1103                "subsystems": [nqn]
1104            })
1105
1106            self.subsystem_info_list.append([port, nqn, ip])
1107        self.subsys_no = len(self.subsystem_info_list)
1108
1109        with open("kernel.conf", "w") as fh:
1110            fh.write(json.dumps(nvmet_cfg, indent=2))
1111
1112    def tgt_start(self):
1113        self.log_print("Configuring kernel NVMeOF Target")
1114
1115        if self.null_block:
1116            print("Configuring with null block device.")
1117            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1118        else:
1119            print("Configuring with NVMe drives.")
1120            nvme_list = get_nvme_devices()
1121
1122        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1123        self.subsys_no = len(nvme_list)
1124
1125        nvmet_command(self.nvmet_bin, "clear")
1126        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1127
1128        if self.enable_adq:
1129            self.adq_configure_tc()
1130
1131        self.log_print("Done configuring kernel NVMeOF Target")
1132
1133
1134class SPDKTarget(Target):
1135    def __init__(self, name, general_config, target_config):
1136        super().__init__(name, general_config, target_config)
1137
1138        # Required fields
1139        self.core_mask = target_config["core_mask"]
1140        self.num_cores = self.get_num_cores(self.core_mask)
1141
1142        # Defaults
1143        self.dif_insert_strip = False
1144        self.null_block_dif_type = 0
1145        self.num_shared_buffers = 4096
1146        self.max_queue_depth = 128
1147        self.bpf_proc = None
1148        self.bpf_scripts = []
1149        self.enable_dsa = False
1150        self.scheduler_core_limit = None
1151
1152        if "num_shared_buffers" in target_config:
1153            self.num_shared_buffers = target_config["num_shared_buffers"]
1154        if "max_queue_depth" in target_config:
1155            self.max_queue_depth = target_config["max_queue_depth"]
1156        if "null_block_dif_type" in target_config:
1157            self.null_block_dif_type = target_config["null_block_dif_type"]
1158        if "dif_insert_strip" in target_config:
1159            self.dif_insert_strip = target_config["dif_insert_strip"]
1160        if "bpf_scripts" in target_config:
1161            self.bpf_scripts = target_config["bpf_scripts"]
1162        if "dsa_settings" in target_config:
1163            self.enable_dsa = target_config["dsa_settings"]
1164        if "scheduler_core_limit" in target_config:
1165            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1166
1167        self.log_print("====DSA settings:====")
1168        self.log_print("DSA enabled: %s" % (self.enable_dsa))
1169
1170    @staticmethod
1171    def get_num_cores(core_mask):
1172        if "0x" in core_mask:
1173            return bin(int(core_mask, 16)).count("1")
1174        else:
1175            num_cores = 0
1176            core_mask = core_mask.replace("[", "")
1177            core_mask = core_mask.replace("]", "")
1178            for i in core_mask.split(","):
1179                if "-" in i:
1180                    x, y = i.split("-")
1181                    num_cores += len(range(int(x), int(y))) + 1
1182                else:
1183                    num_cores += 1
1184            return num_cores
1185
1186    def spdk_tgt_configure(self):
1187        self.log_print("Configuring SPDK NVMeOF target via RPC")
1188
1189        if self.enable_adq:
1190            self.adq_configure_tc()
1191
1192        # Create transport layer
1193        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1194                                       num_shared_buffers=self.num_shared_buffers,
1195                                       max_queue_depth=self.max_queue_depth,
1196                                       dif_insert_or_strip=self.dif_insert_strip,
1197                                       sock_priority=self.adq_priority)
1198        self.log_print("SPDK NVMeOF transport layer:")
1199        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1200
1201        if self.null_block:
1202            self.spdk_tgt_add_nullblock(self.null_block)
1203            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1204        else:
1205            self.spdk_tgt_add_nvme_conf()
1206            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1207
1208        self.log_print("Done configuring SPDK NVMeOF Target")
1209
1210    def spdk_tgt_add_nullblock(self, null_block_count):
1211        md_size = 0
1212        block_size = 4096
1213        if self.null_block_dif_type != 0:
1214            md_size = 128
1215
1216        self.log_print("Adding null block bdevices to config via RPC")
1217        for i in range(null_block_count):
1218            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1219            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1220                                      dif_type=self.null_block_dif_type, md_size=md_size)
1221        self.log_print("SPDK Bdevs configuration:")
1222        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1223
1224    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1225        self.log_print("Adding NVMe bdevs to config via RPC")
1226
1227        bdfs = get_nvme_devices_bdf()
1228        bdfs = [b.replace(":", ".") for b in bdfs]
1229
1230        if req_num_disks:
1231            if req_num_disks > len(bdfs):
1232                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1233                sys.exit(1)
1234            else:
1235                bdfs = bdfs[0:req_num_disks]
1236
1237        for i, bdf in enumerate(bdfs):
1238            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1239
1240        self.log_print("SPDK Bdevs configuration:")
1241        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1242
1243    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1244        self.log_print("Adding subsystems to config")
1245        if not req_num_disks:
1246            req_num_disks = get_nvme_devices_count()
1247
1248        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1249            port = str(4420 + bdev_num)
1250            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1251            serial = "SPDK00%s" % bdev_num
1252            bdev_name = "Nvme%sn1" % bdev_num
1253
1254            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1255                                           allow_any_host=True, max_namespaces=8)
1256            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1257            for nqn_name in [nqn, "discovery"]:
1258                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1259                                                     nqn=nqn_name,
1260                                                     trtype=self.transport,
1261                                                     traddr=ip,
1262                                                     trsvcid=port,
1263                                                     adrfam="ipv4")
1264            self.subsystem_info_list.append([port, nqn, ip])
1265        self.subsys_no = len(self.subsystem_info_list)
1266
1267        self.log_print("SPDK NVMeOF subsystem configuration:")
1268        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1269
1270    def bpf_start(self):
1271        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1272        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1273        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1274        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1275
1276        with open(self.pid, "r") as fh:
1277            nvmf_pid = str(fh.readline())
1278
1279        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1280        self.log_print(cmd)
1281        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1282
1283    def tgt_start(self):
1284        if self.null_block:
1285            self.subsys_no = 1
1286        else:
1287            self.subsys_no = get_nvme_devices_count()
1288        self.log_print("Starting SPDK NVMeOF Target process")
1289        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1290        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1291        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1292
1293        with open(self.pid, "w") as fh:
1294            fh.write(str(proc.pid))
1295        self.nvmf_proc = proc
1296        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1297        self.log_print("Waiting for spdk to initialize...")
1298        while True:
1299            if os.path.exists("/var/tmp/spdk.sock"):
1300                break
1301            time.sleep(1)
1302        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1303
1304        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1305
1306        if self.enable_zcopy:
1307            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1308                                           enable_zerocopy_send_server=True)
1309            self.log_print("Target socket options:")
1310            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1311
1312        if self.enable_adq:
1313            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1314            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1315                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1316
1317        if self.enable_dsa:
1318            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1319            self.log_print("Target DSA accel module enabled")
1320
1321        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1322        rpc.framework_start_init(self.client)
1323
1324        if self.bpf_scripts:
1325            self.bpf_start()
1326
1327        self.spdk_tgt_configure()
1328
1329    def stop(self):
1330        if self.bpf_proc:
1331            self.log_print("Stopping BPF Trace script")
1332            self.bpf_proc.terminate()
1333            self.bpf_proc.wait()
1334
1335        if hasattr(self, "nvmf_proc"):
1336            try:
1337                self.nvmf_proc.terminate()
1338                self.nvmf_proc.wait(timeout=30)
1339            except Exception as e:
1340                self.log_print("Failed to terminate SPDK Target process. Sending SIGKILL.")
1341                self.log_print(e)
1342                self.nvmf_proc.kill()
1343                self.nvmf_proc.communicate()
1344                # Try to clean up RPC socket files if they were not removed
1345                # because of using 'kill'
1346                try:
1347                    os.remove("/var/tmp/spdk.sock")
1348                    os.remove("/var/tmp/spdk.sock.lock")
1349                except FileNotFoundError:
1350                    pass
1351
1352
1353class KernelInitiator(Initiator):
1354    def __init__(self, name, general_config, initiator_config):
1355        super().__init__(name, general_config, initiator_config)
1356
1357        # Defaults
1358        self.extra_params = ""
1359        self.ioengine = "libaio"
1360
1361        if "extra_params" in initiator_config:
1362            self.extra_params = initiator_config["extra_params"]
1363
1364        if "kernel_engine" in initiator_config:
1365            self.ioengine = initiator_config["kernel_engine"]
1366            if "io_uring" in self.ioengine:
1367                self.extra_params = "--nr-poll-queues=8"
1368
1369    def get_connected_nvme_list(self):
1370        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1371        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1372                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1373        return nvme_list
1374
1375    def kernel_init_connect(self):
1376        self.log_print("Below connection attempts may result in error messages, this is expected!")
1377        for subsystem in self.subsystem_info_list:
1378            self.log_print("Trying to connect %s %s %s" % subsystem)
1379            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1380                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1381            time.sleep(2)
1382
1383        if "io_uring" in self.ioengine:
1384            self.log_print("Setting block layer settings for io_uring.")
1385
1386            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1387            #       apparently it's not possible for connected subsystems.
1388            #       Results in "error: Invalid argument"
1389            block_sysfs_settings = {
1390                "iostats": "0",
1391                "rq_affinity": "0",
1392                "nomerges": "2"
1393            }
1394
1395            for disk in self.get_connected_nvme_list():
1396                sysfs = os.path.join("/sys/block", disk, "queue")
1397                for k, v in block_sysfs_settings.items():
1398                    sysfs_opt_path = os.path.join(sysfs, k)
1399                    try:
1400                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1401                    except subprocess.CalledProcessError as e:
1402                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1403                    finally:
1404                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1405                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1406
1407    def kernel_init_disconnect(self):
1408        for subsystem in self.subsystem_info_list:
1409            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1410            time.sleep(1)
1411
1412    def get_nvme_subsystem_numa(self, dev_name):
1413        # Remove two last characters to get controller name instead of subsystem name
1414        nvme_ctrl = os.path.basename(dev_name)[:-2]
1415        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1416                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1417        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1418
1419    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1420        # Generate connected nvme devices names and sort them by used NIC numa node
1421        # to allow better grouping when splitting into fio sections.
1422        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1423        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1424        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1425
1426        filename_section = ""
1427        nvme_per_split = int(len(nvme_list) / len(threads))
1428        remainder = len(nvme_list) % len(threads)
1429        iterator = iter(nvme_list)
1430        result = []
1431        for i in range(len(threads)):
1432            result.append([])
1433            for _ in range(nvme_per_split):
1434                result[i].append(next(iterator))
1435                if remainder:
1436                    result[i].append(next(iterator))
1437                    remainder -= 1
1438        for i, r in enumerate(result):
1439            header = "[filename%s]" % i
1440            disks = "\n".join(["filename=%s" % x for x in r])
1441            job_section_qd = round((io_depth * len(r)) / num_jobs)
1442            if job_section_qd == 0:
1443                job_section_qd = 1
1444            iodepth = "iodepth=%s" % job_section_qd
1445
1446            offset_section = ""
1447            if offset:
1448                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1449
1450            numa_opts = self.gen_fio_numa_section(r)
1451
1452            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1453
1454        return filename_section
1455
1456
1457class SPDKInitiator(Initiator):
1458    def __init__(self, name, general_config, initiator_config):
1459        super().__init__(name, general_config, initiator_config)
1460
1461        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1462            self.install_spdk()
1463
1464        # Required fields
1465        self.num_cores = initiator_config["num_cores"]
1466
1467        # Optional fields
1468        self.enable_data_digest = False
1469        if "enable_data_digest" in initiator_config:
1470            self.enable_data_digest = initiator_config["enable_data_digest"]
1471
1472    def install_spdk(self):
1473        self.log_print("Using fio binary %s" % self.fio_bin)
1474        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1475        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1476        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1477        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1478        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1479
1480        self.log_print("SPDK built")
1481        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1482
1483    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1484        bdev_cfg_section = {
1485            "subsystems": [
1486                {
1487                    "subsystem": "bdev",
1488                    "config": []
1489                }
1490            ]
1491        }
1492
1493        for i, subsys in enumerate(remote_subsystem_list):
1494            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1495            nvme_ctrl = {
1496                "method": "bdev_nvme_attach_controller",
1497                "params": {
1498                    "name": "Nvme{}".format(i),
1499                    "trtype": self.transport,
1500                    "traddr": sub_addr,
1501                    "trsvcid": sub_port,
1502                    "subnqn": sub_nqn,
1503                    "adrfam": "IPv4"
1504                }
1505            }
1506
1507            if self.enable_adq:
1508                nvme_ctrl["params"].update({"priority": "1"})
1509
1510            if self.enable_data_digest:
1511                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1512
1513            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1514
1515        return json.dumps(bdev_cfg_section, indent=2)
1516
1517    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1518        filename_section = ""
1519        if len(threads) >= len(subsystems):
1520            threads = range(0, len(subsystems))
1521
1522        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1523        # to allow better grouping when splitting into fio sections.
1524        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1525        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1526        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1527
1528        nvme_per_split = int(len(subsystems) / len(threads))
1529        remainder = len(subsystems) % len(threads)
1530        iterator = iter(filenames)
1531        result = []
1532        for i in range(len(threads)):
1533            result.append([])
1534            for _ in range(nvme_per_split):
1535                result[i].append(next(iterator))
1536            if remainder:
1537                result[i].append(next(iterator))
1538                remainder -= 1
1539        for i, r in enumerate(result):
1540            header = "[filename%s]" % i
1541            disks = "\n".join(["filename=%s" % x for x in r])
1542            job_section_qd = round((io_depth * len(r)) / num_jobs)
1543            if job_section_qd == 0:
1544                job_section_qd = 1
1545            iodepth = "iodepth=%s" % job_section_qd
1546
1547            offset_section = ""
1548            if offset:
1549                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1550
1551            numa_opts = self.gen_fio_numa_section(r)
1552
1553            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1554
1555        return filename_section
1556
1557    def get_nvme_subsystem_numa(self, bdev_name):
1558        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1559        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1560
1561        # Remove two last characters to get controller name instead of subsystem name
1562        nvme_ctrl = bdev_name[:-2]
1563        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1564        return self.get_route_nic_numa(remote_nvme_ip)
1565
1566
1567if __name__ == "__main__":
1568    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1569    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1570
1571    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1572    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1573                        help='Configuration file.')
1574    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1575                        help='Results directory.')
1576    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1577                        help='CSV results filename.')
1578
1579    args = parser.parse_args()
1580
1581    print("Using config file: %s" % args.config)
1582    with open(args.config, "r") as config:
1583        data = json.load(config)
1584
1585    initiators = []
1586    fio_cases = []
1587
1588    general_config = data["general"]
1589    target_config = data["target"]
1590    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1591
1592    for k, v in data.items():
1593        if "target" in k:
1594            v.update({"results_dir": args.results})
1595            if data[k]["mode"] == "spdk":
1596                target_obj = SPDKTarget(k, data["general"], v)
1597            elif data[k]["mode"] == "kernel":
1598                target_obj = KernelTarget(k, data["general"], v)
1599        elif "initiator" in k:
1600            if data[k]["mode"] == "spdk":
1601                init_obj = SPDKInitiator(k, data["general"], v)
1602            elif data[k]["mode"] == "kernel":
1603                init_obj = KernelInitiator(k, data["general"], v)
1604            initiators.append(init_obj)
1605        elif "fio" in k:
1606            fio_workloads = itertools.product(data[k]["bs"],
1607                                              data[k]["qd"],
1608                                              data[k]["rw"])
1609
1610            fio_run_time = data[k]["run_time"]
1611            fio_ramp_time = data[k]["ramp_time"]
1612            fio_rw_mix_read = data[k]["rwmixread"]
1613            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1614            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1615
1616            fio_rate_iops = 0
1617            if "rate_iops" in data[k]:
1618                fio_rate_iops = data[k]["rate_iops"]
1619
1620            fio_offset = False
1621            if "offset" in data[k]:
1622                fio_offset = data[k]["offset"]
1623            fio_offset_inc = 0
1624            if "offset_inc" in data[k]:
1625                fio_offset_inc = data[k]["offset_inc"]
1626        else:
1627            continue
1628
1629    try:
1630        os.mkdir(args.results)
1631    except FileExistsError:
1632        pass
1633
1634    for i in initiators:
1635        target_obj.initiator_info.append(
1636            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1637        )
1638
1639    # TODO: This try block is definietly too large. Need to break this up into separate
1640    # logical blocks to reduce size.
1641    try:
1642        target_obj.tgt_start()
1643
1644        for i in initiators:
1645            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1646            if i.enable_adq:
1647                i.adq_configure_tc()
1648
1649        # Poor mans threading
1650        # Run FIO tests
1651        for block_size, io_depth, rw in fio_workloads:
1652            threads = []
1653            configs = []
1654            power_daemon = None
1655            for i in initiators:
1656                if i.mode == "kernel":
1657                    i.kernel_init_connect()
1658
1659                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1660                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1661                                       fio_offset, fio_offset_inc)
1662                configs.append(cfg)
1663
1664            for i, cfg in zip(initiators, configs):
1665                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1666                threads.append(t)
1667            if target_obj.enable_sar:
1668                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1669                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1670                threads.append(t)
1671
1672            if target_obj.enable_pcm:
1673                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1674
1675                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1676                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1677                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1678
1679                threads.append(pcm_cpu_t)
1680                threads.append(pcm_mem_t)
1681                threads.append(pcm_pow_t)
1682
1683            if target_obj.enable_bandwidth:
1684                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1685                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1686                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1687                threads.append(t)
1688
1689            if target_obj.enable_dpdk_memory:
1690                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1691                threads.append(t)
1692
1693            if target_obj.enable_adq:
1694                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1695                threads.append(ethtool_thread)
1696
1697            if target_obj.enable_pm:
1698                power_daemon = target_obj.measure_power(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir)
1699
1700            for t in threads:
1701                t.start()
1702            for t in threads:
1703                t.join()
1704
1705            for i in initiators:
1706                if i.mode == "kernel":
1707                    i.kernel_init_disconnect()
1708                i.copy_result_files(args.results)
1709
1710            if power_daemon:
1711                power_daemon.terminate()
1712
1713        target_obj.restore_governor()
1714        target_obj.restore_tuned()
1715        target_obj.restore_services()
1716        target_obj.restore_sysctl()
1717        if target_obj.enable_adq:
1718            target_obj.reload_driver("ice")
1719        for i in initiators:
1720            i.restore_governor()
1721            i.restore_tuned()
1722            i.restore_services()
1723            i.restore_sysctl()
1724            if i.enable_adq:
1725                i.reload_driver("ice")
1726        target_obj.parse_results(args.results, args.csv_filename)
1727    finally:
1728        for i in initiators:
1729            try:
1730                i.stop()
1731            except Exception as err:
1732                pass
1733        target_obj.stop()
1734