xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 0d0de8e7d934159f826acf59a8a608e331a0b403)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7from json.decoder import JSONDecodeError
8import os
9import re
10import sys
11import argparse
12import json
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20from collections import OrderedDict
21
22import paramiko
23import pandas as pd
24from common import *
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.nic_ips = server_config["nic_ips"]
39        self.mode = server_config["mode"]
40
41        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
42        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
43            self.irq_scripts_dir = server_config["irq_scripts_dir"]
44
45        self.local_nic_info = []
46        self._nics_json_obj = {}
47        self.svc_restore_dict = {}
48        self.sysctl_restore_dict = {}
49        self.tuned_restore_dict = {}
50        self.governor_restore = ""
51        self.tuned_profile = ""
52
53        self.enable_adq = False
54        self.adq_priority = None
55        if "adq_enable" in server_config and server_config["adq_enable"]:
56            self.enable_adq = server_config["adq_enable"]
57            self.adq_priority = 1
58
59        if "tuned_profile" in server_config:
60            self.tuned_profile = server_config["tuned_profile"]
61
62        if not re.match("^[A-Za-z0-9]*$", name):
63            self.log_print("Please use a name which contains only letters or numbers")
64            sys.exit(1)
65
66    def log_print(self, msg):
67        print("[%s] %s" % (self.name, msg), flush=True)
68
69    @staticmethod
70    def get_uncommented_lines(lines):
71        return [line for line in lines if line and not line.startswith('#')]
72
73    def get_nic_name_by_ip(self, ip):
74        if not self._nics_json_obj:
75            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
76            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
77        for nic in self._nics_json_obj:
78            for addr in nic["addr_info"]:
79                if ip in addr["local"]:
80                    return nic["ifname"]
81
82    def set_local_nic_info_helper(self):
83        pass
84
85    def set_local_nic_info(self, pci_info):
86        def extract_network_elements(json_obj):
87            nic_list = []
88            if isinstance(json_obj, list):
89                for x in json_obj:
90                    nic_list.extend(extract_network_elements(x))
91            elif isinstance(json_obj, dict):
92                if "children" in json_obj:
93                    nic_list.extend(extract_network_elements(json_obj["children"]))
94                if "class" in json_obj.keys() and "network" in json_obj["class"]:
95                    nic_list.append(json_obj)
96            return nic_list
97
98        self.local_nic_info = extract_network_elements(pci_info)
99
100    def get_nic_numa_node(self, nic_name):
101        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
102
103    def get_numa_cpu_map(self):
104        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
105        numa_cpu_json_map = {}
106
107        for cpu in numa_cpu_json_obj["cpus"]:
108            cpu_num = int(cpu["cpu"])
109            numa_node = int(cpu["node"])
110            numa_cpu_json_map.setdefault(numa_node, [])
111            numa_cpu_json_map[numa_node].append(cpu_num)
112
113        return numa_cpu_json_map
114
115    # pylint: disable=R0201
116    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
117        return ""
118
119    def configure_system(self):
120        self.load_drivers()
121        self.configure_services()
122        self.configure_sysctl()
123        self.configure_tuned()
124        self.configure_cpu_governor()
125        self.configure_irq_affinity()
126
127    def load_drivers(self):
128        self.log_print("Loading drivers")
129        self.exec_cmd(["sudo", "modprobe", "-a",
130                       "nvme-%s" % self.transport,
131                       "nvmet-%s" % self.transport])
132        if self.mode == "kernel" and hasattr(self, "null_block") and self.null_block:
133            self.exec_cmd(["sudo", "modprobe", "null_blk",
134                           "nr_devices=%s" % self.null_block])
135
136    def configure_adq(self):
137        if self.mode == "kernel":
138            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
139            return
140        self.adq_load_modules()
141        self.adq_configure_nic()
142
143    def adq_load_modules(self):
144        self.log_print("Modprobing ADQ-related Linux modules...")
145        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
146        for module in adq_module_deps:
147            try:
148                self.exec_cmd(["sudo", "modprobe", module])
149                self.log_print("%s loaded!" % module)
150            except CalledProcessError as e:
151                self.log_print("ERROR: failed to load module %s" % module)
152                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
153
154    def adq_configure_tc(self):
155        self.log_print("Configuring ADQ Traffic classes and filters...")
156
157        if self.mode == "kernel":
158            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
159            return
160
161        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
162        num_queues_tc1 = self.num_cores
163        port_param = "dst_port" if isinstance(self, Target) else "src_port"
164        port = "4420"
165        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
166
167        for nic_ip in self.nic_ips:
168            nic_name = self.get_nic_name_by_ip(nic_ip)
169            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
170                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
171                                "queues", "%s@0" % num_queues_tc0,
172                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
173                                "hw", "1", "mode", "channel"]
174            self.log_print(" ".join(tc_qdisc_map_cmd))
175            self.exec_cmd(tc_qdisc_map_cmd)
176
177            time.sleep(5)
178            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
179            self.log_print(" ".join(tc_qdisc_ingress_cmd))
180            self.exec_cmd(tc_qdisc_ingress_cmd)
181
182            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
183                             "protocol", "ip", "ingress", "prio", "1", "flower",
184                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
185                             "skip_sw", "hw_tc", "1"]
186            self.log_print(" ".join(tc_filter_cmd))
187            self.exec_cmd(tc_filter_cmd)
188
189            # show tc configuration
190            self.log_print("Show tc configuration for %s NIC..." % nic_name)
191            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
192            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
193            self.log_print("%s" % tc_disk_out)
194            self.log_print("%s" % tc_filter_out)
195
196            # Ethtool coalesce settings must be applied after configuring traffic classes
197            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
198            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
199
200            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
201            xps_cmd = ["sudo", xps_script_path, nic_name]
202            self.log_print(xps_cmd)
203            self.exec_cmd(xps_cmd)
204
205    def reload_driver(self, driver):
206        try:
207            self.exec_cmd(["sudo", "rmmod", driver])
208            self.exec_cmd(["sudo", "modprobe", driver])
209        except CalledProcessError as e:
210            self.log_print("ERROR: failed to reload %s module!" % driver)
211            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
212
213    def adq_configure_nic(self):
214        self.log_print("Configuring NIC port settings for ADQ testing...")
215
216        # Reload the driver first, to make sure any previous settings are re-set.
217        self.reload_driver("ice")
218
219        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
220        for nic in nic_names:
221            self.log_print(nic)
222            try:
223                self.exec_cmd(["sudo", "ethtool", "-K", nic,
224                               "hw-tc-offload", "on"])  # Enable hardware TC offload
225                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
226                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
227                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
228                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
229                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
230                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
231                               "channel-pkt-inspect-optimize", "on"])
232            except CalledProcessError as e:
233                self.log_print("ERROR: failed to configure NIC port using ethtool!")
234                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
235                self.log_print("Please update your NIC driver and firmware versions and try again.")
236            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
237            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
238
239    def configure_services(self):
240        self.log_print("Configuring active services...")
241        svc_config = configparser.ConfigParser(strict=False)
242
243        # Below list is valid only for RHEL / Fedora systems and might not
244        # contain valid names for other distributions.
245        svc_target_state = {
246            "firewalld": "inactive",
247            "irqbalance": "inactive",
248            "lldpad.service": "inactive",
249            "lldpad.socket": "inactive"
250        }
251
252        for service in svc_target_state:
253            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
254            out = "\n".join(["[%s]" % service, out])
255            svc_config.read_string(out)
256
257            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
258                continue
259
260            service_state = svc_config[service]["ActiveState"]
261            self.log_print("Current state of %s service is %s" % (service, service_state))
262            self.svc_restore_dict.update({service: service_state})
263            if service_state != "inactive":
264                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
265                self.exec_cmd(["sudo", "systemctl", "stop", service])
266
267    def configure_sysctl(self):
268        self.log_print("Tuning sysctl settings...")
269
270        busy_read = 0
271        if self.enable_adq and self.mode == "spdk":
272            busy_read = 1
273
274        sysctl_opts = {
275            "net.core.busy_poll": 0,
276            "net.core.busy_read": busy_read,
277            "net.core.somaxconn": 4096,
278            "net.core.netdev_max_backlog": 8192,
279            "net.ipv4.tcp_max_syn_backlog": 16384,
280            "net.core.rmem_max": 268435456,
281            "net.core.wmem_max": 268435456,
282            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
283            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
284            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
285            "net.ipv4.route.flush": 1,
286            "vm.overcommit_memory": 1,
287        }
288
289        for opt, value in sysctl_opts.items():
290            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
291            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
292
293    def configure_tuned(self):
294        if not self.tuned_profile:
295            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
296            return
297
298        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
299        service = "tuned"
300        tuned_config = configparser.ConfigParser(strict=False)
301
302        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
303        out = "\n".join(["[%s]" % service, out])
304        tuned_config.read_string(out)
305        tuned_state = tuned_config[service]["ActiveState"]
306        self.svc_restore_dict.update({service: tuned_state})
307
308        if tuned_state != "inactive":
309            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
310            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
311
312            self.tuned_restore_dict = {
313                "profile": profile,
314                "mode": profile_mode
315            }
316
317        self.exec_cmd(["sudo", "systemctl", "start", service])
318        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
319        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
320
321    def configure_cpu_governor(self):
322        self.log_print("Setting CPU governor to performance...")
323
324        # This assumes that there is the same CPU scaling governor on each CPU
325        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
326        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
327
328    def configure_irq_affinity(self):
329        self.log_print("Setting NIC irq affinity for NICs...")
330
331        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
332        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
333        for nic in nic_names:
334            irq_cmd = ["sudo", irq_script_path, nic]
335            self.log_print(irq_cmd)
336            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
337
338    def restore_services(self):
339        self.log_print("Restoring services...")
340        for service, state in self.svc_restore_dict.items():
341            cmd = "stop" if state == "inactive" else "start"
342            self.exec_cmd(["sudo", "systemctl", cmd, service])
343
344    def restore_sysctl(self):
345        self.log_print("Restoring sysctl settings...")
346        for opt, value in self.sysctl_restore_dict.items():
347            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
348
349    def restore_tuned(self):
350        self.log_print("Restoring tuned-adm settings...")
351
352        if not self.tuned_restore_dict:
353            return
354
355        if self.tuned_restore_dict["mode"] == "auto":
356            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
357            self.log_print("Reverted tuned-adm to auto_profile.")
358        else:
359            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
360            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
361
362    def restore_governor(self):
363        self.log_print("Restoring CPU governor setting...")
364        if self.governor_restore:
365            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
366            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
367
368
369class Target(Server):
370    def __init__(self, name, general_config, target_config):
371        super().__init__(name, general_config, target_config)
372
373        # Defaults
374        self.enable_sar = False
375        self.sar_delay = 0
376        self.sar_interval = 0
377        self.sar_count = 0
378        self.enable_pcm = False
379        self.pcm_dir = ""
380        self.pcm_delay = 0
381        self.pcm_interval = 0
382        self.pcm_count = 0
383        self.enable_bandwidth = 0
384        self.bandwidth_count = 0
385        self.enable_dpdk_memory = False
386        self.dpdk_wait_time = 0
387        self.enable_zcopy = False
388        self.scheduler_name = "static"
389        self.null_block = 0
390        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
391        self.subsystem_info_list = []
392        self.initiator_info = []
393        self.enable_pm = True
394
395        if "null_block_devices" in target_config:
396            self.null_block = target_config["null_block_devices"]
397        if "sar_settings" in target_config:
398            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
399        if "pcm_settings" in target_config:
400            self.enable_pcm = True
401            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
402        if "enable_bandwidth" in target_config:
403            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
404        if "enable_dpdk_memory" in target_config:
405            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
406        if "scheduler_settings" in target_config:
407            self.scheduler_name = target_config["scheduler_settings"]
408        if "zcopy_settings" in target_config:
409            self.enable_zcopy = target_config["zcopy_settings"]
410        if "results_dir" in target_config:
411            self.results_dir = target_config["results_dir"]
412        if "enable_pm" in target_config:
413            self.enable_pm = target_config["enable_pm"]
414
415        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
416        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
417        self.set_local_nic_info(self.set_local_nic_info_helper())
418
419        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
420            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
421
422        self.configure_system()
423        if self.enable_adq:
424            self.configure_adq()
425        self.sys_config()
426
427    def set_local_nic_info_helper(self):
428        return json.loads(self.exec_cmd(["lshw", "-json"]))
429
430    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
431        stderr_opt = None
432        if stderr_redirect:
433            stderr_opt = subprocess.STDOUT
434        if change_dir:
435            old_cwd = os.getcwd()
436            os.chdir(change_dir)
437            self.log_print("Changing directory to %s" % change_dir)
438
439        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
440
441        if change_dir:
442            os.chdir(old_cwd)
443            self.log_print("Changing directory to %s" % old_cwd)
444        return out
445
446    def zip_spdk_sources(self, spdk_dir, dest_file):
447        self.log_print("Zipping SPDK source directory")
448        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
449        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
450            for file in files:
451                fh.write(os.path.relpath(os.path.join(root, file)))
452        fh.close()
453        self.log_print("Done zipping")
454
455    @staticmethod
456    def _chunks(input_list, chunks_no):
457        div, rem = divmod(len(input_list), chunks_no)
458        for i in range(chunks_no):
459            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
460            yield input_list[si:si + (div + 1 if i < rem else div)]
461
462    def spread_bdevs(self, req_disks):
463        # Spread available block devices indexes:
464        # - evenly across available initiator systems
465        # - evenly across available NIC interfaces for
466        #   each initiator
467        # Not NUMA aware.
468        ip_bdev_map = []
469        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
470
471        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
472            self.initiator_info[i]["bdev_range"] = init_chunk
473            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
474            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
475                for c in nic_chunk:
476                    ip_bdev_map.append((ip, c))
477        return ip_bdev_map
478
479    @staticmethod
480    def read_json_stats(file):
481        with open(file, "r") as json_data:
482            data = json.load(json_data)
483            job_pos = 0  # job_post = 0 because using aggregated results
484
485            # Check if latency is in nano or microseconds to choose correct dict key
486            def get_lat_unit(key_prefix, dict_section):
487                # key prefix - lat, clat or slat.
488                # dict section - portion of json containing latency bucket in question
489                # Return dict key to access the bucket and unit as string
490                for k, _ in dict_section.items():
491                    if k.startswith(key_prefix):
492                        return k, k.split("_")[1]
493
494            def get_clat_percentiles(clat_dict_leaf):
495                if "percentile" in clat_dict_leaf:
496                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
497                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
498                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
499                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
500
501                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
502                else:
503                    # Latest fio versions do not provide "percentile" results if no
504                    # measurements were done, so just return zeroes
505                    return [0, 0, 0, 0]
506
507            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
508            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
509            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
510            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
511            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
512            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
513            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
514            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
515                data["jobs"][job_pos]["read"][clat_key])
516
517            if "ns" in lat_unit:
518                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
519            if "ns" in clat_unit:
520                read_p99_lat = read_p99_lat / 1000
521                read_p99_9_lat = read_p99_9_lat / 1000
522                read_p99_99_lat = read_p99_99_lat / 1000
523                read_p99_999_lat = read_p99_999_lat / 1000
524
525            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
526            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
527            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
528            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
529            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
530            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
531            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
532            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
533                data["jobs"][job_pos]["write"][clat_key])
534
535            if "ns" in lat_unit:
536                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
537            if "ns" in clat_unit:
538                write_p99_lat = write_p99_lat / 1000
539                write_p99_9_lat = write_p99_9_lat / 1000
540                write_p99_99_lat = write_p99_99_lat / 1000
541                write_p99_999_lat = write_p99_999_lat / 1000
542
543        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
544                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
545                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
546                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
547
548    def parse_results(self, results_dir, csv_file):
549        files = os.listdir(results_dir)
550        fio_files = filter(lambda x: ".fio" in x, files)
551        json_files = [x for x in files if ".json" in x]
552
553        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
554                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
555                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
556                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
557
558        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
559                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
560
561        header_line = ",".join(["Name", *headers])
562        aggr_header_line = ",".join(["Name", *aggr_headers])
563
564        # Create empty results file
565        with open(os.path.join(results_dir, csv_file), "w") as fh:
566            fh.write(aggr_header_line + "\n")
567        rows = set()
568
569        for fio_config in fio_files:
570            self.log_print("Getting FIO stats for %s" % fio_config)
571            job_name, _ = os.path.splitext(fio_config)
572
573            # Look in the filename for rwmixread value. Function arguments do
574            # not have that information.
575            # TODO: Improve this function by directly using workload params instead
576            # of regexing through filenames.
577            if "read" in job_name:
578                rw_mixread = 1
579            elif "write" in job_name:
580                rw_mixread = 0
581            else:
582                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
583
584            # If "_CPU" exists in name - ignore it
585            # Initiators for the same job could have different num_cores parameter
586            job_name = re.sub(r"_\d+CPU", "", job_name)
587            job_result_files = [x for x in json_files if x.startswith(job_name)]
588            self.log_print("Matching result files for current fio config:")
589            for j in job_result_files:
590                self.log_print("\t %s" % j)
591
592            # There may have been more than 1 initiator used in test, need to check that
593            # Result files are created so that string after last "_" separator is server name
594            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
595            inits_avg_results = []
596            for i in inits_names:
597                self.log_print("\tGetting stats for initiator %s" % i)
598                # There may have been more than 1 test run for this job, calculate average results for initiator
599                i_results = [x for x in job_result_files if i in x]
600                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
601
602                separate_stats = []
603                for r in i_results:
604                    try:
605                        stats = self.read_json_stats(os.path.join(results_dir, r))
606                        separate_stats.append(stats)
607                        self.log_print(stats)
608                    except JSONDecodeError:
609                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
610
611                init_results = [sum(x) for x in zip(*separate_stats)]
612                init_results = [x / len(separate_stats) for x in init_results]
613                inits_avg_results.append(init_results)
614
615                self.log_print("\tAverage results for initiator %s" % i)
616                self.log_print(init_results)
617                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
618                    fh.write(header_line + "\n")
619                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
620
621            # Sum results of all initiators running this FIO job.
622            # Latency results are an average of latencies from accros all initiators.
623            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
624            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
625            for key in inits_avg_results:
626                if "lat" in key:
627                    inits_avg_results[key] /= len(inits_names)
628
629            # Aggregate separate read/write values into common labels
630            # Take rw_mixread into consideration for mixed read/write workloads.
631            aggregate_results = OrderedDict()
632            for h in aggr_headers:
633                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
634                if "lat" in h:
635                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
636                else:
637                    _ = read_stat + write_stat
638                aggregate_results[h] = "{0:.3f}".format(_)
639
640            rows.add(",".join([job_name, *aggregate_results.values()]))
641
642        # Save results to file
643        for row in rows:
644            with open(os.path.join(results_dir, csv_file), "a") as fh:
645                fh.write(row + "\n")
646        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
647
648    def measure_sar(self, results_dir, sar_file_prefix):
649        cpu_number = os.cpu_count()
650        sar_idle_sum = 0
651        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
652        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
653
654        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
655        time.sleep(self.sar_delay)
656        self.log_print("Starting SAR measurements")
657
658        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
659        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
660            for line in out.split("\n"):
661                if "Average" in line:
662                    if "CPU" in line:
663                        self.log_print("Summary CPU utilization from SAR:")
664                        self.log_print(line)
665                    elif "all" in line:
666                        self.log_print(line)
667                    else:
668                        sar_idle_sum += float(line.split()[7])
669            fh.write(out)
670        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
671
672        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
673            f.write("%0.2f" % sar_cpu_usage)
674
675    def measure_power(self, results_dir, prefix, script_full_dir):
676        return subprocess.Popen(["%s/../pm/collect-bmc-pm" % script_full_dir, "-d", results_dir, "-l", "-p", prefix, "-x"])
677
678    def ethtool_after_fio_ramp(self, fio_ramp_time):
679        time.sleep(fio_ramp_time//2)
680        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
681        for nic in nic_names:
682            self.log_print(nic)
683            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
684                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
685
686    def measure_pcm_memory(self, results_dir, pcm_file_name):
687        time.sleep(self.pcm_delay)
688        cmd = ["%s/build/bin/pcm-memory" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
689        pcm_memory = subprocess.Popen(cmd)
690        time.sleep(self.pcm_count)
691        pcm_memory.terminate()
692
693    def measure_pcm(self, results_dir, pcm_file_name):
694        time.sleep(self.pcm_delay)
695        cmd = ["%s/build/bin/pcm" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count,
696               "-csv=%s/%s" % (results_dir, pcm_file_name)]
697        subprocess.run(cmd)
698        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
699        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
700        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
701        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
702        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
703
704    def measure_pcm_power(self, results_dir, pcm_power_file_name):
705        time.sleep(self.pcm_delay)
706        out = self.exec_cmd(["%s/build/bin/pcm-power" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
707        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
708            fh.write(out)
709
710    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
711        self.log_print("INFO: starting network bandwidth measure")
712        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
713                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
714
715    def measure_dpdk_memory(self, results_dir):
716        self.log_print("INFO: waiting to generate DPDK memory usage")
717        time.sleep(self.dpdk_wait_time)
718        self.log_print("INFO: generating DPDK memory usage")
719        rpc.env.env_dpdk_get_mem_stats
720        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
721
722    def sys_config(self):
723        self.log_print("====Kernel release:====")
724        self.log_print(os.uname().release)
725        self.log_print("====Kernel command line:====")
726        with open('/proc/cmdline') as f:
727            cmdline = f.readlines()
728            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
729        self.log_print("====sysctl conf:====")
730        with open('/etc/sysctl.conf') as f:
731            sysctl = f.readlines()
732            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
733        self.log_print("====Cpu power info:====")
734        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
735        self.log_print("====zcopy settings:====")
736        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
737        self.log_print("====Scheduler settings:====")
738        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
739
740
741class Initiator(Server):
742    def __init__(self, name, general_config, initiator_config):
743        super().__init__(name, general_config, initiator_config)
744
745        # Required fields
746        self.ip = initiator_config["ip"]
747        self.target_nic_ips = initiator_config["target_nic_ips"]
748
749        # Defaults
750        self.cpus_allowed = None
751        self.cpus_allowed_policy = "shared"
752        self.spdk_dir = "/tmp/spdk"
753        self.fio_bin = "/usr/src/fio/fio"
754        self.nvmecli_bin = "nvme"
755        self.cpu_frequency = None
756        self.subsystem_info_list = []
757
758        if "spdk_dir" in initiator_config:
759            self.spdk_dir = initiator_config["spdk_dir"]
760        if "fio_bin" in initiator_config:
761            self.fio_bin = initiator_config["fio_bin"]
762        if "nvmecli_bin" in initiator_config:
763            self.nvmecli_bin = initiator_config["nvmecli_bin"]
764        if "cpus_allowed" in initiator_config:
765            self.cpus_allowed = initiator_config["cpus_allowed"]
766        if "cpus_allowed_policy" in initiator_config:
767            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
768        if "cpu_frequency" in initiator_config:
769            self.cpu_frequency = initiator_config["cpu_frequency"]
770        if os.getenv('SPDK_WORKSPACE'):
771            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
772
773        self.ssh_connection = paramiko.SSHClient()
774        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
775        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
776        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
777        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
778        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
779
780        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
781            self.copy_spdk("/tmp/spdk.zip")
782        self.set_local_nic_info(self.set_local_nic_info_helper())
783        self.set_cpu_frequency()
784        self.configure_system()
785        if self.enable_adq:
786            self.configure_adq()
787        self.sys_config()
788
789    def set_local_nic_info_helper(self):
790        return json.loads(self.exec_cmd(["lshw", "-json"]))
791
792    def stop(self):
793        self.ssh_connection.close()
794
795    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
796        if change_dir:
797            cmd = ["cd", change_dir, ";", *cmd]
798
799        # In case one of the command elements contains whitespace and is not
800        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
801        # when sending to remote system.
802        for i, c in enumerate(cmd):
803            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
804                cmd[i] = '"%s"' % c
805        cmd = " ".join(cmd)
806
807        # Redirect stderr to stdout thanks using get_pty option if needed
808        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
809        out = stdout.read().decode(encoding="utf-8")
810
811        # Check the return code
812        rc = stdout.channel.recv_exit_status()
813        if rc:
814            raise CalledProcessError(int(rc), cmd, out)
815
816        return out
817
818    def put_file(self, local, remote_dest):
819        ftp = self.ssh_connection.open_sftp()
820        ftp.put(local, remote_dest)
821        ftp.close()
822
823    def get_file(self, remote, local_dest):
824        ftp = self.ssh_connection.open_sftp()
825        ftp.get(remote, local_dest)
826        ftp.close()
827
828    def copy_spdk(self, local_spdk_zip):
829        self.log_print("Copying SPDK sources to initiator %s" % self.name)
830        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
831        self.log_print("Copied sources zip from target")
832        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
833        self.log_print("Sources unpacked")
834
835    def copy_result_files(self, dest_dir):
836        self.log_print("Copying results")
837
838        if not os.path.exists(dest_dir):
839            os.mkdir(dest_dir)
840
841        # Get list of result files from initiator and copy them back to target
842        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
843
844        for file in file_list:
845            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
846                          os.path.join(dest_dir, file))
847        self.log_print("Done copying results")
848
849    def discover_subsystems(self, address_list, subsys_no):
850        num_nvmes = range(0, subsys_no)
851        nvme_discover_output = ""
852        for ip, subsys_no in itertools.product(address_list, num_nvmes):
853            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
854            nvme_discover_cmd = ["sudo",
855                                 "%s" % self.nvmecli_bin,
856                                 "discover", "-t", "%s" % self.transport,
857                                 "-s", "%s" % (4420 + subsys_no),
858                                 "-a", "%s" % ip]
859
860            try:
861                stdout = self.exec_cmd(nvme_discover_cmd)
862                if stdout:
863                    nvme_discover_output = nvme_discover_output + stdout
864            except CalledProcessError:
865                # Do nothing. In case of discovering remote subsystems of kernel target
866                # we expect "nvme discover" to fail a bunch of times because we basically
867                # scan ports.
868                pass
869
870        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
871                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
872                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
873                                nvme_discover_output)  # from nvme discovery output
874        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
875        subsystems = filter(lambda x: "discovery" not in x[1], subsystems)
876        subsystems = list(set(subsystems))
877        subsystems.sort(key=lambda x: x[1])
878        self.log_print("Found matching subsystems on target side:")
879        for s in subsystems:
880            self.log_print(s)
881        self.subsystem_info_list = subsystems
882
883    def gen_fio_filename_conf(self, *args, **kwargs):
884        # Logic implemented in SPDKInitiator and KernelInitiator classes
885        pass
886
887    def get_route_nic_numa(self, remote_nvme_ip):
888        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
889        local_nvme_nic = local_nvme_nic[0]["dev"]
890        return self.get_nic_numa_node(local_nvme_nic)
891
892    @staticmethod
893    def gen_fio_offset_section(offset_inc, num_jobs):
894        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
895        return "\n".join(["size=%s%%" % offset_inc,
896                          "offset=0%",
897                          "offset_increment=%s%%" % offset_inc])
898
899    def gen_fio_numa_section(self, fio_filenames_list):
900        numa_stats = {}
901        for nvme in fio_filenames_list:
902            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
903            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
904
905        # Use the most common NUMA node for this chunk to allocate memory and CPUs
906        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
907
908        return "\n".join(["numa_cpu_nodes=%s" % section_local_numa,
909                          "numa_mem_policy=prefer:%s" % section_local_numa])
910
911    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
912                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
913                       offset=False, offset_inc=0):
914        fio_conf_template = """
915[global]
916ioengine={ioengine}
917{spdk_conf}
918thread=1
919group_reporting=1
920direct=1
921percentile_list=50:90:99:99.5:99.9:99.99:99.999
922
923norandommap=1
924rw={rw}
925rwmixread={rwmixread}
926bs={block_size}
927time_based=1
928ramp_time={ramp_time}
929runtime={run_time}
930rate_iops={rate_iops}
931"""
932        if "spdk" in self.mode:
933            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
934            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
935            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
936            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
937        else:
938            ioengine = self.ioengine
939            spdk_conf = ""
940            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
941                                 "|", "awk", "'{print $1}'"])
942            subsystems = [x for x in out.split("\n") if "nvme" in x]
943
944        if self.cpus_allowed is not None:
945            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
946            cpus_num = 0
947            cpus = self.cpus_allowed.split(",")
948            for cpu in cpus:
949                if "-" in cpu:
950                    a, b = cpu.split("-")
951                    a = int(a)
952                    b = int(b)
953                    cpus_num += len(range(a, b))
954                else:
955                    cpus_num += 1
956            self.num_cores = cpus_num
957            threads = range(0, self.num_cores)
958        elif hasattr(self, 'num_cores'):
959            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
960            threads = range(0, int(self.num_cores))
961        else:
962            self.num_cores = len(subsystems)
963            threads = range(0, len(subsystems))
964
965        if "spdk" in self.mode:
966            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
967                                                          offset, offset_inc)
968        else:
969            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs,
970                                                          offset, offset_inc)
971
972        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
973                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
974                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
975
976        # TODO: hipri disabled for now, as it causes fio errors:
977        # io_u error on file /dev/nvme2n1: Operation not supported
978        # See comment in KernelInitiator class, kernel_init_connect() function
979        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
980            fio_config = fio_config + """
981fixedbufs=1
982registerfiles=1
983#hipri=1
984"""
985        if num_jobs:
986            fio_config = fio_config + "numjobs=%s \n" % num_jobs
987        if self.cpus_allowed is not None:
988            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
989            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
990        fio_config = fio_config + filename_section
991
992        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
993        if hasattr(self, "num_cores"):
994            fio_config_filename += "_%sCPU" % self.num_cores
995        fio_config_filename += ".fio"
996
997        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
998        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
999        self.log_print("Created FIO Config:")
1000        self.log_print(fio_config)
1001
1002        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
1003
1004    def set_cpu_frequency(self):
1005        if self.cpu_frequency is not None:
1006            try:
1007                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
1008                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
1009                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
1010            except Exception:
1011                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
1012                sys.exit()
1013        else:
1014            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
1015
1016    def run_fio(self, fio_config_file, run_num=None):
1017        job_name, _ = os.path.splitext(fio_config_file)
1018        self.log_print("Starting FIO run for job: %s" % job_name)
1019        self.log_print("Using FIO: %s" % self.fio_bin)
1020
1021        if run_num:
1022            for i in range(1, run_num + 1):
1023                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
1024                try:
1025                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
1026                                            "--output=%s" % output_filename, "--eta=never"], True)
1027                    self.log_print(output)
1028                except subprocess.CalledProcessError as e:
1029                    self.log_print("ERROR: Fio process failed!")
1030                    self.log_print(e.stdout)
1031        else:
1032            output_filename = job_name + "_" + self.name + ".json"
1033            output = self.exec_cmd(["sudo", self.fio_bin,
1034                                    fio_config_file, "--output-format=json",
1035                                    "--output" % output_filename], True)
1036            self.log_print(output)
1037        self.log_print("FIO run finished. Results in: %s" % output_filename)
1038
1039    def sys_config(self):
1040        self.log_print("====Kernel release:====")
1041        self.log_print(self.exec_cmd(["uname", "-r"]))
1042        self.log_print("====Kernel command line:====")
1043        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
1044        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
1045        self.log_print("====sysctl conf:====")
1046        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
1047        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
1048        self.log_print("====Cpu power info:====")
1049        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
1050
1051
1052class KernelTarget(Target):
1053    def __init__(self, name, general_config, target_config):
1054        super().__init__(name, general_config, target_config)
1055        # Defaults
1056        self.nvmet_bin = "nvmetcli"
1057
1058        if "nvmet_bin" in target_config:
1059            self.nvmet_bin = target_config["nvmet_bin"]
1060
1061    def stop(self):
1062        nvmet_command(self.nvmet_bin, "clear")
1063
1064    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1065
1066        nvmet_cfg = {
1067            "ports": [],
1068            "hosts": [],
1069            "subsystems": [],
1070        }
1071
1072        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1073            port = str(4420 + bdev_num)
1074            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1075            serial = "SPDK00%s" % bdev_num
1076            bdev_name = nvme_list[bdev_num]
1077
1078            nvmet_cfg["subsystems"].append({
1079                "allowed_hosts": [],
1080                "attr": {
1081                    "allow_any_host": "1",
1082                    "serial": serial,
1083                    "version": "1.3"
1084                },
1085                "namespaces": [
1086                    {
1087                        "device": {
1088                            "path": bdev_name,
1089                            "uuid": "%s" % uuid.uuid4()
1090                        },
1091                        "enable": 1,
1092                        "nsid": port
1093                    }
1094                ],
1095                "nqn": nqn
1096            })
1097
1098            nvmet_cfg["ports"].append({
1099                "addr": {
1100                    "adrfam": "ipv4",
1101                    "traddr": ip,
1102                    "trsvcid": port,
1103                    "trtype": self.transport
1104                },
1105                "portid": bdev_num,
1106                "referrals": [],
1107                "subsystems": [nqn]
1108            })
1109
1110            self.subsystem_info_list.append([port, nqn, ip])
1111        self.subsys_no = len(self.subsystem_info_list)
1112
1113        with open("kernel.conf", "w") as fh:
1114            fh.write(json.dumps(nvmet_cfg, indent=2))
1115
1116    def tgt_start(self):
1117        self.log_print("Configuring kernel NVMeOF Target")
1118
1119        if self.null_block:
1120            print("Configuring with null block device.")
1121            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1122        else:
1123            print("Configuring with NVMe drives.")
1124            nvme_list = get_nvme_devices()
1125
1126        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1127        self.subsys_no = len(nvme_list)
1128
1129        nvmet_command(self.nvmet_bin, "clear")
1130        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1131
1132        if self.enable_adq:
1133            self.adq_configure_tc()
1134
1135        self.log_print("Done configuring kernel NVMeOF Target")
1136
1137
1138class SPDKTarget(Target):
1139    def __init__(self, name, general_config, target_config):
1140        super().__init__(name, general_config, target_config)
1141
1142        # Required fields
1143        self.core_mask = target_config["core_mask"]
1144        self.num_cores = self.get_num_cores(self.core_mask)
1145
1146        # Defaults
1147        self.dif_insert_strip = False
1148        self.null_block_dif_type = 0
1149        self.num_shared_buffers = 4096
1150        self.max_queue_depth = 128
1151        self.bpf_proc = None
1152        self.bpf_scripts = []
1153        self.enable_dsa = False
1154        self.scheduler_core_limit = None
1155
1156        if "num_shared_buffers" in target_config:
1157            self.num_shared_buffers = target_config["num_shared_buffers"]
1158        if "max_queue_depth" in target_config:
1159            self.max_queue_depth = target_config["max_queue_depth"]
1160        if "null_block_dif_type" in target_config:
1161            self.null_block_dif_type = target_config["null_block_dif_type"]
1162        if "dif_insert_strip" in target_config:
1163            self.dif_insert_strip = target_config["dif_insert_strip"]
1164        if "bpf_scripts" in target_config:
1165            self.bpf_scripts = target_config["bpf_scripts"]
1166        if "dsa_settings" in target_config:
1167            self.enable_dsa = target_config["dsa_settings"]
1168        if "scheduler_core_limit" in target_config:
1169            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1170
1171        self.log_print("====DSA settings:====")
1172        self.log_print("DSA enabled: %s" % (self.enable_dsa))
1173
1174    @staticmethod
1175    def get_num_cores(core_mask):
1176        if "0x" in core_mask:
1177            return bin(int(core_mask, 16)).count("1")
1178        else:
1179            num_cores = 0
1180            core_mask = core_mask.replace("[", "")
1181            core_mask = core_mask.replace("]", "")
1182            for i in core_mask.split(","):
1183                if "-" in i:
1184                    x, y = i.split("-")
1185                    num_cores += len(range(int(x), int(y))) + 1
1186                else:
1187                    num_cores += 1
1188            return num_cores
1189
1190    def spdk_tgt_configure(self):
1191        self.log_print("Configuring SPDK NVMeOF target via RPC")
1192
1193        if self.enable_adq:
1194            self.adq_configure_tc()
1195
1196        # Create transport layer
1197        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1198                                       num_shared_buffers=self.num_shared_buffers,
1199                                       max_queue_depth=self.max_queue_depth,
1200                                       dif_insert_or_strip=self.dif_insert_strip,
1201                                       sock_priority=self.adq_priority)
1202        self.log_print("SPDK NVMeOF transport layer:")
1203        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1204
1205        if self.null_block:
1206            self.spdk_tgt_add_nullblock(self.null_block)
1207            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1208        else:
1209            self.spdk_tgt_add_nvme_conf()
1210            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1211
1212        self.log_print("Done configuring SPDK NVMeOF Target")
1213
1214    def spdk_tgt_add_nullblock(self, null_block_count):
1215        md_size = 0
1216        block_size = 4096
1217        if self.null_block_dif_type != 0:
1218            md_size = 128
1219
1220        self.log_print("Adding null block bdevices to config via RPC")
1221        for i in range(null_block_count):
1222            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1223            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1224                                      dif_type=self.null_block_dif_type, md_size=md_size)
1225        self.log_print("SPDK Bdevs configuration:")
1226        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1227
1228    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1229        self.log_print("Adding NVMe bdevs to config via RPC")
1230
1231        bdfs = get_nvme_devices_bdf()
1232        bdfs = [b.replace(":", ".") for b in bdfs]
1233
1234        if req_num_disks:
1235            if req_num_disks > len(bdfs):
1236                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1237                sys.exit(1)
1238            else:
1239                bdfs = bdfs[0:req_num_disks]
1240
1241        for i, bdf in enumerate(bdfs):
1242            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1243
1244        self.log_print("SPDK Bdevs configuration:")
1245        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1246
1247    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1248        self.log_print("Adding subsystems to config")
1249        if not req_num_disks:
1250            req_num_disks = get_nvme_devices_count()
1251
1252        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1253            port = str(4420 + bdev_num)
1254            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1255            serial = "SPDK00%s" % bdev_num
1256            bdev_name = "Nvme%sn1" % bdev_num
1257
1258            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1259                                           allow_any_host=True, max_namespaces=8)
1260            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1261            for nqn_name in [nqn, "discovery"]:
1262                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1263                                                     nqn=nqn_name,
1264                                                     trtype=self.transport,
1265                                                     traddr=ip,
1266                                                     trsvcid=port,
1267                                                     adrfam="ipv4")
1268            self.subsystem_info_list.append([port, nqn, ip])
1269        self.subsys_no = len(self.subsystem_info_list)
1270
1271        self.log_print("SPDK NVMeOF subsystem configuration:")
1272        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1273
1274    def bpf_start(self):
1275        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1276        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1277        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1278        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1279
1280        with open(self.pid, "r") as fh:
1281            nvmf_pid = str(fh.readline())
1282
1283        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1284        self.log_print(cmd)
1285        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1286
1287    def tgt_start(self):
1288        if self.null_block:
1289            self.subsys_no = 1
1290        else:
1291            self.subsys_no = get_nvme_devices_count()
1292        self.log_print("Starting SPDK NVMeOF Target process")
1293        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1294        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1295        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1296
1297        with open(self.pid, "w") as fh:
1298            fh.write(str(proc.pid))
1299        self.nvmf_proc = proc
1300        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1301        self.log_print("Waiting for spdk to initialize...")
1302        while True:
1303            if os.path.exists("/var/tmp/spdk.sock"):
1304                break
1305            time.sleep(1)
1306        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1307
1308        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1309
1310        if self.enable_zcopy:
1311            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1312                                           enable_zerocopy_send_server=True)
1313            self.log_print("Target socket options:")
1314            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1315
1316        if self.enable_adq:
1317            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1318            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1319                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1320
1321        if self.enable_dsa:
1322            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1323            self.log_print("Target DSA accel module enabled")
1324
1325        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1326        rpc.framework_start_init(self.client)
1327
1328        if self.bpf_scripts:
1329            self.bpf_start()
1330
1331        self.spdk_tgt_configure()
1332
1333    def stop(self):
1334        if self.bpf_proc:
1335            self.log_print("Stopping BPF Trace script")
1336            self.bpf_proc.terminate()
1337            self.bpf_proc.wait()
1338
1339        if hasattr(self, "nvmf_proc"):
1340            try:
1341                self.nvmf_proc.terminate()
1342                self.nvmf_proc.wait(timeout=30)
1343            except Exception as e:
1344                self.log_print("Failed to terminate SPDK Target process. Sending SIGKILL.")
1345                self.log_print(e)
1346                self.nvmf_proc.kill()
1347                self.nvmf_proc.communicate()
1348                # Try to clean up RPC socket files if they were not removed
1349                # because of using 'kill'
1350                try:
1351                    os.remove("/var/tmp/spdk.sock")
1352                    os.remove("/var/tmp/spdk.sock.lock")
1353                except FileNotFoundError:
1354                    pass
1355
1356
1357class KernelInitiator(Initiator):
1358    def __init__(self, name, general_config, initiator_config):
1359        super().__init__(name, general_config, initiator_config)
1360
1361        # Defaults
1362        self.extra_params = ""
1363        self.ioengine = "libaio"
1364
1365        if "extra_params" in initiator_config:
1366            self.extra_params = initiator_config["extra_params"]
1367
1368        if "kernel_engine" in initiator_config:
1369            self.ioengine = initiator_config["kernel_engine"]
1370            if "io_uring" in self.ioengine:
1371                self.extra_params = "--nr-poll-queues=8"
1372
1373    def get_connected_nvme_list(self):
1374        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1375        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1376                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1377        return nvme_list
1378
1379    def kernel_init_connect(self):
1380        self.log_print("Below connection attempts may result in error messages, this is expected!")
1381        for subsystem in self.subsystem_info_list:
1382            self.log_print("Trying to connect %s %s %s" % subsystem)
1383            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1384                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1385            time.sleep(2)
1386
1387        if "io_uring" in self.ioengine:
1388            self.log_print("Setting block layer settings for io_uring.")
1389
1390            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1391            #       apparently it's not possible for connected subsystems.
1392            #       Results in "error: Invalid argument"
1393            block_sysfs_settings = {
1394                "iostats": "0",
1395                "rq_affinity": "0",
1396                "nomerges": "2"
1397            }
1398
1399            for disk in self.get_connected_nvme_list():
1400                sysfs = os.path.join("/sys/block", disk, "queue")
1401                for k, v in block_sysfs_settings.items():
1402                    sysfs_opt_path = os.path.join(sysfs, k)
1403                    try:
1404                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1405                    except subprocess.CalledProcessError as e:
1406                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1407                    finally:
1408                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1409                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1410
1411    def kernel_init_disconnect(self):
1412        for subsystem in self.subsystem_info_list:
1413            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1414            time.sleep(1)
1415
1416    def get_nvme_subsystem_numa(self, dev_name):
1417        # Remove two last characters to get controller name instead of subsystem name
1418        nvme_ctrl = os.path.basename(dev_name)[:-2]
1419        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1420                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1421        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1422
1423    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1424        # Generate connected nvme devices names and sort them by used NIC numa node
1425        # to allow better grouping when splitting into fio sections.
1426        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1427        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1428        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1429
1430        filename_section = ""
1431        nvme_per_split = int(len(nvme_list) / len(threads))
1432        remainder = len(nvme_list) % len(threads)
1433        iterator = iter(nvme_list)
1434        result = []
1435        for i in range(len(threads)):
1436            result.append([])
1437            for _ in range(nvme_per_split):
1438                result[i].append(next(iterator))
1439                if remainder:
1440                    result[i].append(next(iterator))
1441                    remainder -= 1
1442        for i, r in enumerate(result):
1443            header = "[filename%s]" % i
1444            disks = "\n".join(["filename=%s" % x for x in r])
1445            job_section_qd = round((io_depth * len(r)) / num_jobs)
1446            if job_section_qd == 0:
1447                job_section_qd = 1
1448            iodepth = "iodepth=%s" % job_section_qd
1449
1450            offset_section = ""
1451            if offset:
1452                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1453
1454            numa_opts = self.gen_fio_numa_section(r)
1455
1456            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1457
1458        return filename_section
1459
1460
1461class SPDKInitiator(Initiator):
1462    def __init__(self, name, general_config, initiator_config):
1463        super().__init__(name, general_config, initiator_config)
1464
1465        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1466            self.install_spdk()
1467
1468        # Required fields
1469        self.num_cores = initiator_config["num_cores"]
1470
1471        # Optional fields
1472        self.enable_data_digest = False
1473        if "enable_data_digest" in initiator_config:
1474            self.enable_data_digest = initiator_config["enable_data_digest"]
1475
1476    def install_spdk(self):
1477        self.log_print("Using fio binary %s" % self.fio_bin)
1478        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1479        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1480        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1481        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1482        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1483
1484        self.log_print("SPDK built")
1485        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1486
1487    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1488        bdev_cfg_section = {
1489            "subsystems": [
1490                {
1491                    "subsystem": "bdev",
1492                    "config": []
1493                }
1494            ]
1495        }
1496
1497        for i, subsys in enumerate(remote_subsystem_list):
1498            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1499            nvme_ctrl = {
1500                "method": "bdev_nvme_attach_controller",
1501                "params": {
1502                    "name": "Nvme{}".format(i),
1503                    "trtype": self.transport,
1504                    "traddr": sub_addr,
1505                    "trsvcid": sub_port,
1506                    "subnqn": sub_nqn,
1507                    "adrfam": "IPv4"
1508                }
1509            }
1510
1511            if self.enable_adq:
1512                nvme_ctrl["params"].update({"priority": "1"})
1513
1514            if self.enable_data_digest:
1515                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1516
1517            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1518
1519        return json.dumps(bdev_cfg_section, indent=2)
1520
1521    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1522        filename_section = ""
1523        if len(threads) >= len(subsystems):
1524            threads = range(0, len(subsystems))
1525
1526        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1527        # to allow better grouping when splitting into fio sections.
1528        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1529        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1530        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1531
1532        nvme_per_split = int(len(subsystems) / len(threads))
1533        remainder = len(subsystems) % len(threads)
1534        iterator = iter(filenames)
1535        result = []
1536        for i in range(len(threads)):
1537            result.append([])
1538            for _ in range(nvme_per_split):
1539                result[i].append(next(iterator))
1540            if remainder:
1541                result[i].append(next(iterator))
1542                remainder -= 1
1543        for i, r in enumerate(result):
1544            header = "[filename%s]" % i
1545            disks = "\n".join(["filename=%s" % x for x in r])
1546            job_section_qd = round((io_depth * len(r)) / num_jobs)
1547            if job_section_qd == 0:
1548                job_section_qd = 1
1549            iodepth = "iodepth=%s" % job_section_qd
1550
1551            offset_section = ""
1552            if offset:
1553                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1554
1555            numa_opts = self.gen_fio_numa_section(r)
1556
1557            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1558
1559        return filename_section
1560
1561    def get_nvme_subsystem_numa(self, bdev_name):
1562        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1563        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1564
1565        # Remove two last characters to get controller name instead of subsystem name
1566        nvme_ctrl = bdev_name[:-2]
1567        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1568        return self.get_route_nic_numa(remote_nvme_ip)
1569
1570
1571if __name__ == "__main__":
1572    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1573    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1574
1575    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1576    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1577                        help='Configuration file.')
1578    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1579                        help='Results directory.')
1580    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1581                        help='CSV results filename.')
1582
1583    args = parser.parse_args()
1584
1585    print("Using config file: %s" % args.config)
1586    with open(args.config, "r") as config:
1587        data = json.load(config)
1588
1589    initiators = []
1590    fio_cases = []
1591
1592    general_config = data["general"]
1593    target_config = data["target"]
1594    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1595
1596    for k, v in data.items():
1597        if "target" in k:
1598            v.update({"results_dir": args.results})
1599            if data[k]["mode"] == "spdk":
1600                target_obj = SPDKTarget(k, data["general"], v)
1601            elif data[k]["mode"] == "kernel":
1602                target_obj = KernelTarget(k, data["general"], v)
1603        elif "initiator" in k:
1604            if data[k]["mode"] == "spdk":
1605                init_obj = SPDKInitiator(k, data["general"], v)
1606            elif data[k]["mode"] == "kernel":
1607                init_obj = KernelInitiator(k, data["general"], v)
1608            initiators.append(init_obj)
1609        elif "fio" in k:
1610            fio_workloads = itertools.product(data[k]["bs"],
1611                                              data[k]["qd"],
1612                                              data[k]["rw"])
1613
1614            fio_run_time = data[k]["run_time"]
1615            fio_ramp_time = data[k]["ramp_time"]
1616            fio_rw_mix_read = data[k]["rwmixread"]
1617            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1618            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1619
1620            fio_rate_iops = 0
1621            if "rate_iops" in data[k]:
1622                fio_rate_iops = data[k]["rate_iops"]
1623
1624            fio_offset = False
1625            if "offset" in data[k]:
1626                fio_offset = data[k]["offset"]
1627            fio_offset_inc = 0
1628            if "offset_inc" in data[k]:
1629                fio_offset_inc = data[k]["offset_inc"]
1630        else:
1631            continue
1632
1633    try:
1634        os.mkdir(args.results)
1635    except FileExistsError:
1636        pass
1637
1638    for i in initiators:
1639        target_obj.initiator_info.append(
1640            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1641        )
1642
1643    # TODO: This try block is definietly too large. Need to break this up into separate
1644    # logical blocks to reduce size.
1645    try:
1646        target_obj.tgt_start()
1647
1648        for i in initiators:
1649            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1650            if i.enable_adq:
1651                i.adq_configure_tc()
1652
1653        # Poor mans threading
1654        # Run FIO tests
1655        for block_size, io_depth, rw in fio_workloads:
1656            threads = []
1657            configs = []
1658            power_daemon = None
1659            for i in initiators:
1660                if i.mode == "kernel":
1661                    i.kernel_init_connect()
1662
1663                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1664                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1665                                       fio_offset, fio_offset_inc)
1666                configs.append(cfg)
1667
1668            for i, cfg in zip(initiators, configs):
1669                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1670                threads.append(t)
1671            if target_obj.enable_sar:
1672                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1673                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1674                threads.append(t)
1675
1676            if target_obj.enable_pcm:
1677                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1678
1679                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1680                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1681                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1682
1683                threads.append(pcm_cpu_t)
1684                threads.append(pcm_mem_t)
1685                threads.append(pcm_pow_t)
1686
1687            if target_obj.enable_bandwidth:
1688                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1689                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1690                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1691                threads.append(t)
1692
1693            if target_obj.enable_dpdk_memory:
1694                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1695                threads.append(t)
1696
1697            if target_obj.enable_adq:
1698                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1699                threads.append(ethtool_thread)
1700
1701            if target_obj.enable_pm:
1702                power_daemon = target_obj.measure_power(args.results, "%s_%s_%s" % (block_size, rw, io_depth), script_full_dir)
1703
1704            for t in threads:
1705                t.start()
1706            for t in threads:
1707                t.join()
1708
1709            for i in initiators:
1710                if i.mode == "kernel":
1711                    i.kernel_init_disconnect()
1712                i.copy_result_files(args.results)
1713
1714            if power_daemon:
1715                power_daemon.terminate()
1716
1717        target_obj.restore_governor()
1718        target_obj.restore_tuned()
1719        target_obj.restore_services()
1720        target_obj.restore_sysctl()
1721        if target_obj.enable_adq:
1722            target_obj.reload_driver("ice")
1723        for i in initiators:
1724            i.restore_governor()
1725            i.restore_tuned()
1726            i.restore_services()
1727            i.restore_sysctl()
1728            if i.enable_adq:
1729                i.reload_driver("ice")
1730        target_obj.parse_results(args.results, args.csv_filename)
1731    finally:
1732        for i in initiators:
1733            try:
1734                i.stop()
1735            except Exception as err:
1736                pass
1737        target_obj.stop()
1738