xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision e9030f36bc2672216cc333e940696627451aba19)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20
21import rpc
22import rpc.client
23from common import *
24
25
26class Server:
27    def __init__(self, name, general_config, server_config):
28        self.name = name
29        self.username = general_config["username"]
30        self.password = general_config["password"]
31        self.transport = general_config["transport"].lower()
32        self.nic_ips = server_config["nic_ips"]
33        self.mode = server_config["mode"]
34
35        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
36        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
37            self.irq_scripts_dir = server_config["irq_scripts_dir"]
38
39        self.local_nic_info = []
40        self._nics_json_obj = {}
41        self.svc_restore_dict = {}
42        self.sysctl_restore_dict = {}
43        self.tuned_restore_dict = {}
44        self.governor_restore = ""
45        self.tuned_profile = ""
46
47        self.enable_adq = False
48        self.adq_priority = None
49        if "adq_enable" in server_config and server_config["adq_enable"]:
50            self.enable_adq = server_config["adq_enable"]
51            self.adq_priority = 1
52
53        if "tuned_profile" in server_config:
54            self.tuned_profile = server_config["tuned_profile"]
55
56        if not re.match("^[A-Za-z0-9]*$", name):
57            self.log_print("Please use a name which contains only letters or numbers")
58            sys.exit(1)
59
60    def log_print(self, msg):
61        print("[%s] %s" % (self.name, msg), flush=True)
62
63    def get_uncommented_lines(self, lines):
64        return [line for line in lines if line and not line.startswith('#')]
65
66    def get_nic_name_by_ip(self, ip):
67        if not self._nics_json_obj:
68            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
69            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
70        for nic in self._nics_json_obj:
71            for addr in nic["addr_info"]:
72                if ip in addr["local"]:
73                    return nic["ifname"]
74
75    def set_local_nic_info_helper(self):
76        pass
77
78    def set_local_nic_info(self, pci_info):
79        def extract_network_elements(json_obj):
80            nic_list = []
81            if isinstance(json_obj, list):
82                for x in json_obj:
83                    nic_list.extend(extract_network_elements(x))
84            elif isinstance(json_obj, dict):
85                if "children" in json_obj:
86                    nic_list.extend(extract_network_elements(json_obj["children"]))
87                if "class" in json_obj.keys() and "network" in json_obj["class"]:
88                    nic_list.append(json_obj)
89            return nic_list
90
91        self.local_nic_info = extract_network_elements(pci_info)
92
93    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
94        return ""
95
96    def configure_system(self):
97        self.configure_services()
98        self.configure_sysctl()
99        self.configure_tuned()
100        self.configure_cpu_governor()
101        self.configure_irq_affinity()
102
103    def configure_adq(self):
104        if self.mode == "kernel":
105            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
106            return
107        self.adq_load_modules()
108        self.adq_configure_nic()
109
110    def adq_load_modules(self):
111        self.log_print("Modprobing ADQ-related Linux modules...")
112        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
113        for module in adq_module_deps:
114            try:
115                self.exec_cmd(["sudo", "modprobe", module])
116                self.log_print("%s loaded!" % module)
117            except CalledProcessError as e:
118                self.log_print("ERROR: failed to load module %s" % module)
119                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
120
121    def adq_configure_tc(self):
122        self.log_print("Configuring ADQ Traffic classess and filters...")
123
124        if self.mode == "kernel":
125            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
126            return
127
128        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
129        num_queues_tc1 = self.num_cores
130        port_param = "dst_port" if isinstance(self, Target) else "src_port"
131        port = "4420"
132        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
133
134        for nic_ip in self.nic_ips:
135            nic_name = self.get_nic_name_by_ip(nic_ip)
136            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
137                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
138                                "queues", "%s@0" % num_queues_tc0,
139                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
140                                "hw", "1", "mode", "channel"]
141            self.log_print(" ".join(tc_qdisc_map_cmd))
142            self.exec_cmd(tc_qdisc_map_cmd)
143
144            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
145            self.log_print(" ".join(tc_qdisc_ingress_cmd))
146            self.exec_cmd(tc_qdisc_ingress_cmd)
147
148            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
149                             "protocol", "ip", "ingress", "prio", "1", "flower",
150                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
151                             "skip_sw", "hw_tc", "1"]
152            self.log_print(" ".join(tc_filter_cmd))
153            self.exec_cmd(tc_filter_cmd)
154
155            # Ethtool coalese settings must be applied after configuring traffic classes
156            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
157            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
158
159            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
160            xps_cmd = ["sudo", xps_script_path, nic_name]
161            self.log_print(xps_cmd)
162            self.exec_cmd(xps_cmd)
163
164    def adq_configure_nic(self):
165        self.log_print("Configuring NIC port settings for ADQ testing...")
166
167        # Reload the driver first, to make sure any previous settings are re-set.
168        try:
169            self.exec_cmd(["sudo", "rmmod", "ice"])
170            self.exec_cmd(["sudo", "modprobe", "ice"])
171        except CalledProcessError as e:
172            self.log_print("ERROR: failed to reload ice module!")
173            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
174
175        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
176        for nic in nic_names:
177            self.log_print(nic)
178            try:
179                self.exec_cmd(["sudo", "ethtool", "-K", nic,
180                               "hw-tc-offload", "on"])  # Enable hardware TC offload
181                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
182                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
183                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
184                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
185                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
186            except CalledProcessError as e:
187                self.log_print("ERROR: failed to configure NIC port using ethtool!")
188                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
189                self.log_print("Please update your NIC driver and firmware versions and try again.")
190            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
191            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
192
193    def configure_services(self):
194        self.log_print("Configuring active services...")
195        svc_config = configparser.ConfigParser(strict=False)
196
197        # Below list is valid only for RHEL / Fedora systems and might not
198        # contain valid names for other distributions.
199        svc_target_state = {
200            "firewalld": "inactive",
201            "irqbalance": "inactive",
202            "lldpad.service": "inactive",
203            "lldpad.socket": "inactive"
204        }
205
206        for service in svc_target_state:
207            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
208            out = "\n".join(["[%s]" % service, out])
209            svc_config.read_string(out)
210
211            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
212                continue
213
214            service_state = svc_config[service]["ActiveState"]
215            self.log_print("Current state of %s service is %s" % (service, service_state))
216            self.svc_restore_dict.update({service: service_state})
217            if service_state != "inactive":
218                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
219                self.exec_cmd(["sudo", "systemctl", "stop", service])
220
221    def configure_sysctl(self):
222        self.log_print("Tuning sysctl settings...")
223
224        busy_read = 0
225        if self.enable_adq and self.mode == "spdk":
226            busy_read = 1
227
228        sysctl_opts = {
229            "net.core.busy_poll": 0,
230            "net.core.busy_read": busy_read,
231            "net.core.somaxconn": 4096,
232            "net.core.netdev_max_backlog": 8192,
233            "net.ipv4.tcp_max_syn_backlog": 16384,
234            "net.core.rmem_max": 268435456,
235            "net.core.wmem_max": 268435456,
236            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
237            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
238            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
239            "net.ipv4.route.flush": 1,
240            "vm.overcommit_memory": 1,
241        }
242
243        for opt, value in sysctl_opts.items():
244            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
245            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
246
247    def configure_tuned(self):
248        if not self.tuned_profile:
249            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
250            return
251
252        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
253        service = "tuned"
254        tuned_config = configparser.ConfigParser(strict=False)
255
256        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
257        out = "\n".join(["[%s]" % service, out])
258        tuned_config.read_string(out)
259        tuned_state = tuned_config[service]["ActiveState"]
260        self.svc_restore_dict.update({service: tuned_state})
261
262        if tuned_state != "inactive":
263            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
264            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
265
266            self.tuned_restore_dict = {
267                "profile": profile,
268                "mode": profile_mode
269            }
270
271        self.exec_cmd(["sudo", "systemctl", "start", service])
272        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
273        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
274
275    def configure_cpu_governor(self):
276        self.log_print("Setting CPU governor to performance...")
277
278        # This assumes that there is the same CPU scaling governor on each CPU
279        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
280        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
281
282    def configure_irq_affinity(self):
283        self.log_print("Setting NIC irq affinity for NICs...")
284
285        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
286        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
287        for nic in nic_names:
288            irq_cmd = ["sudo", irq_script_path, nic]
289            self.log_print(irq_cmd)
290            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
291
292    def restore_services(self):
293        self.log_print("Restoring services...")
294        for service, state in self.svc_restore_dict.items():
295            cmd = "stop" if state == "inactive" else "start"
296            self.exec_cmd(["sudo", "systemctl", cmd, service])
297
298    def restore_sysctl(self):
299        self.log_print("Restoring sysctl settings...")
300        for opt, value in self.sysctl_restore_dict.items():
301            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
302
303    def restore_tuned(self):
304        self.log_print("Restoring tuned-adm settings...")
305
306        if not self.tuned_restore_dict:
307            return
308
309        if self.tuned_restore_dict["mode"] == "auto":
310            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
311            self.log_print("Reverted tuned-adm to auto_profile.")
312        else:
313            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
314            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
315
316    def restore_governor(self):
317        self.log_print("Restoring CPU governor setting...")
318        if self.governor_restore:
319            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
320            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
321
322
323class Target(Server):
324    def __init__(self, name, general_config, target_config):
325        super(Target, self).__init__(name, general_config, target_config)
326
327        # Defaults
328        self.enable_sar = False
329        self.sar_delay = 0
330        self.sar_interval = 0
331        self.sar_count = 0
332        self.enable_pcm = False
333        self.pcm_dir = ""
334        self.pcm_delay = 0
335        self.pcm_interval = 0
336        self.pcm_count = 0
337        self.enable_bandwidth = 0
338        self.bandwidth_count = 0
339        self.enable_dpdk_memory = False
340        self.dpdk_wait_time = 0
341        self.enable_zcopy = False
342        self.scheduler_name = "static"
343        self.null_block = 0
344        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
345        self.subsystem_info_list = []
346
347        if "null_block_devices" in target_config:
348            self.null_block = target_config["null_block_devices"]
349        if "sar_settings" in target_config:
350            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
351        if "pcm_settings" in target_config:
352            self.enable_pcm = True
353            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
354        if "enable_bandwidth" in target_config:
355            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
356        if "enable_dpdk_memory" in target_config:
357            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
358        if "scheduler_settings" in target_config:
359            self.scheduler_name = target_config["scheduler_settings"]
360        if "zcopy_settings" in target_config:
361            self.enable_zcopy = target_config["zcopy_settings"]
362        if "results_dir" in target_config:
363            self.results_dir = target_config["results_dir"]
364
365        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
366        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
367        self.set_local_nic_info(self.set_local_nic_info_helper())
368
369        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
370            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
371
372        self.configure_system()
373        if self.enable_adq:
374            self.configure_adq()
375        self.sys_config()
376
377    def set_local_nic_info_helper(self):
378        return json.loads(self.exec_cmd(["lshw", "-json"]))
379
380    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
381        stderr_opt = None
382        if stderr_redirect:
383            stderr_opt = subprocess.STDOUT
384        if change_dir:
385            old_cwd = os.getcwd()
386            os.chdir(change_dir)
387            self.log_print("Changing directory to %s" % change_dir)
388
389        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
390
391        if change_dir:
392            os.chdir(old_cwd)
393            self.log_print("Changing directory to %s" % old_cwd)
394        return out
395
396    def zip_spdk_sources(self, spdk_dir, dest_file):
397        self.log_print("Zipping SPDK source directory")
398        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
399        for root, directories, files in os.walk(spdk_dir, followlinks=True):
400            for file in files:
401                fh.write(os.path.relpath(os.path.join(root, file)))
402        fh.close()
403        self.log_print("Done zipping")
404
405    def read_json_stats(self, file):
406        with open(file, "r") as json_data:
407            data = json.load(json_data)
408            job_pos = 0  # job_post = 0 because using aggregated results
409
410            # Check if latency is in nano or microseconds to choose correct dict key
411            def get_lat_unit(key_prefix, dict_section):
412                # key prefix - lat, clat or slat.
413                # dict section - portion of json containing latency bucket in question
414                # Return dict key to access the bucket and unit as string
415                for k, _ in dict_section.items():
416                    if k.startswith(key_prefix):
417                        return k, k.split("_")[1]
418
419            def get_clat_percentiles(clat_dict_leaf):
420                if "percentile" in clat_dict_leaf:
421                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
422                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
423                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
424                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
425
426                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
427                else:
428                    # Latest fio versions do not provide "percentile" results if no
429                    # measurements were done, so just return zeroes
430                    return [0, 0, 0, 0]
431
432            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
433            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
434            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
435            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
436            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
437            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
438            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
439            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
440                data["jobs"][job_pos]["read"][clat_key])
441
442            if "ns" in lat_unit:
443                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
444            if "ns" in clat_unit:
445                read_p99_lat = read_p99_lat / 1000
446                read_p99_9_lat = read_p99_9_lat / 1000
447                read_p99_99_lat = read_p99_99_lat / 1000
448                read_p99_999_lat = read_p99_999_lat / 1000
449
450            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
451            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
452            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
453            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
454            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
455            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
456            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
457            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
458                data["jobs"][job_pos]["write"][clat_key])
459
460            if "ns" in lat_unit:
461                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
462            if "ns" in clat_unit:
463                write_p99_lat = write_p99_lat / 1000
464                write_p99_9_lat = write_p99_9_lat / 1000
465                write_p99_99_lat = write_p99_99_lat / 1000
466                write_p99_999_lat = write_p99_999_lat / 1000
467
468        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
469                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
470                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
471                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
472
473    def parse_results(self, results_dir, csv_file):
474        files = os.listdir(results_dir)
475        fio_files = filter(lambda x: ".fio" in x, files)
476        json_files = [x for x in files if ".json" in x]
477
478        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
479                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
480                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
481                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
482
483        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
484                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
485
486        header_line = ",".join(["Name", *headers])
487        aggr_header_line = ",".join(["Name", *aggr_headers])
488
489        # Create empty results file
490        with open(os.path.join(results_dir, csv_file), "w") as fh:
491            fh.write(aggr_header_line + "\n")
492        rows = set()
493
494        for fio_config in fio_files:
495            self.log_print("Getting FIO stats for %s" % fio_config)
496            job_name, _ = os.path.splitext(fio_config)
497
498            # Look in the filename for rwmixread value. Function arguments do
499            # not have that information.
500            # TODO: Improve this function by directly using workload params instead
501            # of regexing through filenames.
502            if "read" in job_name:
503                rw_mixread = 1
504            elif "write" in job_name:
505                rw_mixread = 0
506            else:
507                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
508
509            # If "_CPU" exists in name - ignore it
510            # Initiators for the same job could have diffrent num_cores parameter
511            job_name = re.sub(r"_\d+CPU", "", job_name)
512            job_result_files = [x for x in json_files if job_name in x]
513            self.log_print("Matching result files for current fio config:")
514            for j in job_result_files:
515                self.log_print("\t %s" % j)
516
517            # There may have been more than 1 initiator used in test, need to check that
518            # Result files are created so that string after last "_" separator is server name
519            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
520            inits_avg_results = []
521            for i in inits_names:
522                self.log_print("\tGetting stats for initiator %s" % i)
523                # There may have been more than 1 test run for this job, calculate average results for initiator
524                i_results = [x for x in job_result_files if i in x]
525                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
526
527                separate_stats = []
528                for r in i_results:
529                    try:
530                        stats = self.read_json_stats(os.path.join(results_dir, r))
531                        separate_stats.append(stats)
532                        self.log_print(stats)
533                    except JSONDecodeError as e:
534                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!")
535
536                init_results = [sum(x) for x in zip(*separate_stats)]
537                init_results = [x / len(separate_stats) for x in init_results]
538                inits_avg_results.append(init_results)
539
540                self.log_print("\tAverage results for initiator %s" % i)
541                self.log_print(init_results)
542                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
543                    fh.write(header_line + "\n")
544                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
545
546            # Sum results of all initiators running this FIO job.
547            # Latency results are an average of latencies from accros all initiators.
548            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
549            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
550            for key in inits_avg_results:
551                if "lat" in key:
552                    inits_avg_results[key] /= len(inits_names)
553
554            # Aggregate separate read/write values into common labels
555            # Take rw_mixread into consideration for mixed read/write workloads.
556            aggregate_results = OrderedDict()
557            for h in aggr_headers:
558                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
559                if "lat" in h:
560                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
561                else:
562                    _ = read_stat + write_stat
563                aggregate_results[h] = "{0:.3f}".format(_)
564
565            rows.add(",".join([job_name, *aggregate_results.values()]))
566
567        # Save results to file
568        for row in rows:
569            with open(os.path.join(results_dir, csv_file), "a") as fh:
570                fh.write(row + "\n")
571        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
572
573    def measure_sar(self, results_dir, sar_file_name):
574        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
575        cpu_number = os.cpu_count()
576        sar_idle_sum = 0
577        time.sleep(self.sar_delay)
578        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
579        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
580            for line in out.split("\n"):
581                if "Average" in line:
582                    if "CPU" in line:
583                        self.log_print("Summary CPU utilization from SAR:")
584                        self.log_print(line)
585                    elif "all" in line:
586                        self.log_print(line)
587                    else:
588                        sar_idle_sum += line.split()[7]
589            fh.write(out)
590        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
591        with open(os.path.join(results_dir, sar_file_name), "a") as f:
592            f.write("Total CPU used: ".join(sar_cpu_usage))
593
594    def measure_pcm_memory(self, results_dir, pcm_file_name):
595        time.sleep(self.pcm_delay)
596        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
597        pcm_memory = subprocess.Popen(cmd)
598        time.sleep(self.pcm_count)
599        pcm_memory.terminate()
600
601    def measure_pcm(self, results_dir, pcm_file_name):
602        time.sleep(self.pcm_delay)
603        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
604        subprocess.run(cmd)
605        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
606        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
607        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
608        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
609        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
610
611    def measure_pcm_power(self, results_dir, pcm_power_file_name):
612        time.sleep(self.pcm_delay)
613        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
614        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
615            fh.write(out)
616
617    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
618        self.log_print("INFO: starting network bandwidth measure")
619        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
620                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
621
622    def measure_dpdk_memory(self, results_dir):
623        self.log_print("INFO: waiting to generate DPDK memory usage")
624        time.sleep(self.dpdk_wait_time)
625        self.log_print("INFO: generating DPDK memory usage")
626        rpc.env.env_dpdk_get_mem_stats
627        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
628
629    def sys_config(self):
630        self.log_print("====Kernel release:====")
631        self.log_print(os.uname().release)
632        self.log_print("====Kernel command line:====")
633        with open('/proc/cmdline') as f:
634            cmdline = f.readlines()
635            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
636        self.log_print("====sysctl conf:====")
637        with open('/etc/sysctl.conf') as f:
638            sysctl = f.readlines()
639            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
640        self.log_print("====Cpu power info:====")
641        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
642        self.log_print("====zcopy settings:====")
643        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
644        self.log_print("====Scheduler settings:====")
645        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
646
647
648class Initiator(Server):
649    def __init__(self, name, general_config, initiator_config):
650        super(Initiator, self).__init__(name, general_config, initiator_config)
651
652        # Required fields
653        self.ip = initiator_config["ip"]
654        self.target_nic_ips = initiator_config["target_nic_ips"]
655
656        # Defaults
657        self.cpus_allowed = None
658        self.cpus_allowed_policy = "shared"
659        self.spdk_dir = "/tmp/spdk"
660        self.fio_bin = "/usr/src/fio/fio"
661        self.nvmecli_bin = "nvme"
662        self.cpu_frequency = None
663        self.subsystem_info_list = []
664
665        if "spdk_dir" in initiator_config:
666            self.spdk_dir = initiator_config["spdk_dir"]
667        if "fio_bin" in initiator_config:
668            self.fio_bin = initiator_config["fio_bin"]
669        if "nvmecli_bin" in initiator_config:
670            self.nvmecli_bin = initiator_config["nvmecli_bin"]
671        if "cpus_allowed" in initiator_config:
672            self.cpus_allowed = initiator_config["cpus_allowed"]
673        if "cpus_allowed_policy" in initiator_config:
674            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
675        if "cpu_frequency" in initiator_config:
676            self.cpu_frequency = initiator_config["cpu_frequency"]
677        if os.getenv('SPDK_WORKSPACE'):
678            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
679
680        self.ssh_connection = paramiko.SSHClient()
681        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
682        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
683        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
684        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
685        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
686
687        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
688            self.copy_spdk("/tmp/spdk.zip")
689        self.set_local_nic_info(self.set_local_nic_info_helper())
690        self.set_cpu_frequency()
691        self.configure_system()
692        if self.enable_adq:
693            self.configure_adq()
694        self.sys_config()
695
696    def set_local_nic_info_helper(self):
697        return json.loads(self.exec_cmd(["lshw", "-json"]))
698
699    def __del__(self):
700        self.ssh_connection.close()
701
702    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
703        if change_dir:
704            cmd = ["cd", change_dir, ";", *cmd]
705
706        # In case one of the command elements contains whitespace and is not
707        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
708        # when sending to remote system.
709        for i, c in enumerate(cmd):
710            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
711                cmd[i] = '"%s"' % c
712        cmd = " ".join(cmd)
713
714        # Redirect stderr to stdout thanks using get_pty option if needed
715        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
716        out = stdout.read().decode(encoding="utf-8")
717
718        # Check the return code
719        rc = stdout.channel.recv_exit_status()
720        if rc:
721            raise CalledProcessError(int(rc), cmd, out)
722
723        return out
724
725    def put_file(self, local, remote_dest):
726        ftp = self.ssh_connection.open_sftp()
727        ftp.put(local, remote_dest)
728        ftp.close()
729
730    def get_file(self, remote, local_dest):
731        ftp = self.ssh_connection.open_sftp()
732        ftp.get(remote, local_dest)
733        ftp.close()
734
735    def copy_spdk(self, local_spdk_zip):
736        self.log_print("Copying SPDK sources to initiator %s" % self.name)
737        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
738        self.log_print("Copied sources zip from target")
739        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
740        self.log_print("Sources unpacked")
741
742    def copy_result_files(self, dest_dir):
743        self.log_print("Copying results")
744
745        if not os.path.exists(dest_dir):
746            os.mkdir(dest_dir)
747
748        # Get list of result files from initiator and copy them back to target
749        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
750
751        for file in file_list:
752            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
753                          os.path.join(dest_dir, file))
754        self.log_print("Done copying results")
755
756    def discover_subsystems(self, address_list, subsys_no):
757        num_nvmes = range(0, subsys_no)
758        nvme_discover_output = ""
759        for ip, subsys_no in itertools.product(address_list, num_nvmes):
760            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
761            nvme_discover_cmd = ["sudo",
762                                 "%s" % self.nvmecli_bin,
763                                 "discover", "-t", "%s" % self.transport,
764                                 "-s", "%s" % (4420 + subsys_no),
765                                 "-a", "%s" % ip]
766
767            try:
768                stdout = self.exec_cmd(nvme_discover_cmd)
769                if stdout:
770                    nvme_discover_output = nvme_discover_output + stdout
771            except CalledProcessError:
772                # Do nothing. In case of discovering remote subsystems of kernel target
773                # we expect "nvme discover" to fail a bunch of times because we basically
774                # scan ports.
775                pass
776
777        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
778                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
779                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
780                                nvme_discover_output)  # from nvme discovery output
781        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
782        subsystems = list(set(subsystems))
783        subsystems.sort(key=lambda x: x[1])
784        self.log_print("Found matching subsystems on target side:")
785        for s in subsystems:
786            self.log_print(s)
787        self.subsystem_info_list = subsystems
788
789    def gen_fio_filename_conf(self, *args, **kwargs):
790        # Logic implemented in SPDKInitiator and KernelInitiator classes
791        pass
792
793    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
794        fio_conf_template = """
795[global]
796ioengine={ioengine}
797{spdk_conf}
798thread=1
799group_reporting=1
800direct=1
801percentile_list=50:90:99:99.5:99.9:99.99:99.999
802
803norandommap=1
804rw={rw}
805rwmixread={rwmixread}
806bs={block_size}
807time_based=1
808ramp_time={ramp_time}
809runtime={run_time}
810rate_iops={rate_iops}
811"""
812        if "spdk" in self.mode:
813            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
814            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
815            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
816            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
817        else:
818            ioengine = self.ioengine
819            spdk_conf = ""
820            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
821                                 "|", "awk", "'{print $1}'"])
822            subsystems = [x for x in out.split("\n") if "nvme" in x]
823
824        if self.cpus_allowed is not None:
825            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
826            cpus_num = 0
827            cpus = self.cpus_allowed.split(",")
828            for cpu in cpus:
829                if "-" in cpu:
830                    a, b = cpu.split("-")
831                    a = int(a)
832                    b = int(b)
833                    cpus_num += len(range(a, b))
834                else:
835                    cpus_num += 1
836            self.num_cores = cpus_num
837            threads = range(0, self.num_cores)
838        elif hasattr(self, 'num_cores'):
839            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
840            threads = range(0, int(self.num_cores))
841        else:
842            self.num_cores = len(subsystems)
843            threads = range(0, len(subsystems))
844
845        if "spdk" in self.mode:
846            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
847        else:
848            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
849
850        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
851                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
852                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
853
854        # TODO: hipri disabled for now, as it causes fio errors:
855        # io_u error on file /dev/nvme2n1: Operation not supported
856        # See comment in KernelInitiator class, kernel_init_connect() function
857        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
858            fio_config = fio_config + """
859fixedbufs=1
860registerfiles=1
861#hipri=1
862"""
863        if num_jobs:
864            fio_config = fio_config + "numjobs=%s \n" % num_jobs
865        if self.cpus_allowed is not None:
866            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
867            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
868        fio_config = fio_config + filename_section
869
870        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
871        if hasattr(self, "num_cores"):
872            fio_config_filename += "_%sCPU" % self.num_cores
873        fio_config_filename += ".fio"
874
875        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
876        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
877        self.log_print("Created FIO Config:")
878        self.log_print(fio_config)
879
880        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
881
882    def set_cpu_frequency(self):
883        if self.cpu_frequency is not None:
884            try:
885                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
886                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
887                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
888            except Exception:
889                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
890                sys.exit()
891        else:
892            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
893
894    def run_fio(self, fio_config_file, run_num=None):
895        job_name, _ = os.path.splitext(fio_config_file)
896        self.log_print("Starting FIO run for job: %s" % job_name)
897        self.log_print("Using FIO: %s" % self.fio_bin)
898
899        if run_num:
900            for i in range(1, run_num + 1):
901                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
902                try:
903                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
904                                            "--output=%s" % output_filename, "--eta=never"], True)
905                    self.log_print(output)
906                except subprocess.CalledProcessError as e:
907                    self.log_print("ERROR: Fio process failed!")
908                    self.log_print(e.stdout)
909        else:
910            output_filename = job_name + "_" + self.name + ".json"
911            output = self.exec_cmd(["sudo", self.fio_bin,
912                                    fio_config_file, "--output-format=json",
913                                    "--output" % output_filename], True)
914            self.log_print(output)
915        self.log_print("FIO run finished. Results in: %s" % output_filename)
916
917    def sys_config(self):
918        self.log_print("====Kernel release:====")
919        self.log_print(self.exec_cmd(["uname", "-r"]))
920        self.log_print("====Kernel command line:====")
921        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
922        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
923        self.log_print("====sysctl conf:====")
924        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
925        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
926        self.log_print("====Cpu power info:====")
927        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
928
929
930class KernelTarget(Target):
931    def __init__(self, name, general_config, target_config):
932        super(KernelTarget, self).__init__(name, general_config, target_config)
933        # Defaults
934        self.nvmet_bin = "nvmetcli"
935
936        if "nvmet_bin" in target_config:
937            self.nvmet_bin = target_config["nvmet_bin"]
938
939    def __del__(self):
940        nvmet_command(self.nvmet_bin, "clear")
941
942    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
943
944        nvmet_cfg = {
945            "ports": [],
946            "hosts": [],
947            "subsystems": [],
948        }
949
950        # Split disks between NIC IP's
951        disks_per_ip = int(len(nvme_list) / len(address_list))
952        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
953
954        subsys_no = 1
955        port_no = 0
956        for ip, chunk in zip(address_list, disk_chunks):
957            for disk in chunk:
958                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
959                nvmet_cfg["subsystems"].append({
960                    "allowed_hosts": [],
961                    "attr": {
962                        "allow_any_host": "1",
963                        "serial": "SPDK00%s" % subsys_no,
964                        "version": "1.3"
965                    },
966                    "namespaces": [
967                        {
968                            "device": {
969                                "path": disk,
970                                "uuid": "%s" % uuid.uuid4()
971                            },
972                            "enable": 1,
973                            "nsid": subsys_no
974                        }
975                    ],
976                    "nqn": nqn
977                })
978
979                nvmet_cfg["ports"].append({
980                    "addr": {
981                        "adrfam": "ipv4",
982                        "traddr": ip,
983                        "trsvcid": "%s" % (4420 + port_no),
984                        "trtype": "%s" % self.transport
985                    },
986                    "portid": subsys_no,
987                    "referrals": [],
988                    "subsystems": [nqn]
989                })
990                subsys_no += 1
991                port_no += 1
992                self.subsystem_info_list.append([port_no, nqn, ip])
993
994        with open("kernel.conf", "w") as fh:
995            fh.write(json.dumps(nvmet_cfg, indent=2))
996        pass
997
998    def tgt_start(self):
999        self.log_print("Configuring kernel NVMeOF Target")
1000
1001        if self.null_block:
1002            print("Configuring with null block device.")
1003            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1004            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
1005            self.subsys_no = len(null_blk_list)
1006        else:
1007            print("Configuring with NVMe drives.")
1008            nvme_list = get_nvme_devices()
1009            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1010            self.subsys_no = len(nvme_list)
1011
1012        nvmet_command(self.nvmet_bin, "clear")
1013        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1014
1015        if self.enable_adq:
1016            self.adq_configure_tc()
1017
1018        self.log_print("Done configuring kernel NVMeOF Target")
1019
1020
1021class SPDKTarget(Target):
1022    def __init__(self, name, general_config, target_config):
1023        super(SPDKTarget, self).__init__(name, general_config, target_config)
1024
1025        # Required fields
1026        self.core_mask = target_config["core_mask"]
1027        self.num_cores = self.get_num_cores(self.core_mask)
1028
1029        # Defaults
1030        self.dif_insert_strip = False
1031        self.null_block_dif_type = 0
1032        self.num_shared_buffers = 4096
1033        self.bpf_proc = None
1034        self.bpf_scripts = []
1035
1036        if "num_shared_buffers" in target_config:
1037            self.num_shared_buffers = target_config["num_shared_buffers"]
1038        if "null_block_dif_type" in target_config:
1039            self.null_block_dif_type = target_config["null_block_dif_type"]
1040        if "dif_insert_strip" in target_config:
1041            self.dif_insert_strip = target_config["dif_insert_strip"]
1042        if "bpf_scripts" in target_config:
1043            self.bpf_scripts = target_config["bpf_scripts"]
1044
1045    def get_num_cores(self, core_mask):
1046        if "0x" in core_mask:
1047            return bin(int(core_mask, 16)).count("1")
1048        else:
1049            num_cores = 0
1050            core_mask = core_mask.replace("[", "")
1051            core_mask = core_mask.replace("]", "")
1052            for i in core_mask.split(","):
1053                if "-" in i:
1054                    x, y = i.split("-")
1055                    num_cores += len(range(int(x), int(y))) + 1
1056                else:
1057                    num_cores += 1
1058            return num_cores
1059
1060    def spdk_tgt_configure(self):
1061        self.log_print("Configuring SPDK NVMeOF target via RPC")
1062
1063        # Create RDMA transport layer
1064        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1065                                       num_shared_buffers=self.num_shared_buffers,
1066                                       dif_insert_or_strip=self.dif_insert_strip,
1067                                       sock_priority=self.adq_priority)
1068        self.log_print("SPDK NVMeOF transport layer:")
1069        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1070
1071        if self.enable_adq:
1072            self.adq_configure_tc()
1073            self.log_print("Done configuring SPDK NVMeOF Target")
1074
1075        if self.null_block:
1076            self.spdk_tgt_add_nullblock(self.null_block)
1077            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1078        else:
1079            self.spdk_tgt_add_nvme_conf()
1080            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1081
1082    def spdk_tgt_add_nullblock(self, null_block_count):
1083        md_size = 0
1084        block_size = 4096
1085        if self.null_block_dif_type != 0:
1086            md_size = 128
1087
1088        self.log_print("Adding null block bdevices to config via RPC")
1089        for i in range(null_block_count):
1090            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1091            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1092                                      dif_type=self.null_block_dif_type, md_size=md_size)
1093        self.log_print("SPDK Bdevs configuration:")
1094        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1095
1096    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1097        self.log_print("Adding NVMe bdevs to config via RPC")
1098
1099        bdfs = get_nvme_devices_bdf()
1100        bdfs = [b.replace(":", ".") for b in bdfs]
1101
1102        if req_num_disks:
1103            if req_num_disks > len(bdfs):
1104                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1105                sys.exit(1)
1106            else:
1107                bdfs = bdfs[0:req_num_disks]
1108
1109        for i, bdf in enumerate(bdfs):
1110            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1111
1112        self.log_print("SPDK Bdevs configuration:")
1113        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1114
1115    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1116        self.log_print("Adding subsystems to config")
1117        port = "4420"
1118        if not req_num_disks:
1119            req_num_disks = get_nvme_devices_count()
1120
1121        # Distribute bdevs between provided NICs
1122        num_disks = range(0, req_num_disks)
1123        if len(num_disks) == 1:
1124            disks_per_ip = 1
1125        else:
1126            disks_per_ip = int(len(num_disks) / len(ips))
1127        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1128
1129        # Create subsystems, add bdevs to namespaces, add listeners
1130        for ip, chunk in zip(ips, disk_chunks):
1131            for c in chunk:
1132                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1133                serial = "SPDK00%s" % c
1134                bdev_name = "Nvme%sn1" % c
1135                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1136                                               allow_any_host=True, max_namespaces=8)
1137                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1138
1139                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1140                                                     nqn=nqn,
1141                                                     trtype=self.transport,
1142                                                     traddr=ip,
1143                                                     trsvcid=port,
1144                                                     adrfam="ipv4")
1145
1146                self.subsystem_info_list.append([port, nqn, ip])
1147        self.log_print("SPDK NVMeOF subsystem configuration:")
1148        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1149
1150    def bpf_start(self):
1151        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1152        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1153        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1154        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1155
1156        with open(self.pid, "r") as fh:
1157            nvmf_pid = str(fh.readline())
1158
1159        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1160        self.log_print(cmd)
1161        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1162
1163    def tgt_start(self):
1164        if self.null_block:
1165            self.subsys_no = 1
1166        else:
1167            self.subsys_no = get_nvme_devices_count()
1168        self.log_print("Starting SPDK NVMeOF Target process")
1169        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1170        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1171        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1172
1173        with open(self.pid, "w") as fh:
1174            fh.write(str(proc.pid))
1175        self.nvmf_proc = proc
1176        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1177        self.log_print("Waiting for spdk to initilize...")
1178        while True:
1179            if os.path.exists("/var/tmp/spdk.sock"):
1180                break
1181            time.sleep(1)
1182        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1183
1184        if self.enable_zcopy:
1185            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1186                                           enable_zerocopy_send_server=True)
1187            self.log_print("Target socket options:")
1188            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1189
1190        if self.enable_adq:
1191            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1192            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1193                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1194            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1195
1196        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1197
1198        rpc.framework_start_init(self.client)
1199
1200        if self.bpf_scripts:
1201            self.bpf_start()
1202
1203        self.spdk_tgt_configure()
1204
1205    def __del__(self):
1206        if self.bpf_proc:
1207            self.log_print("Stopping BPF Trace script")
1208            self.bpf_proc.terminate()
1209            self.bpf_proc.wait()
1210
1211        if hasattr(self, "nvmf_proc"):
1212            try:
1213                self.nvmf_proc.terminate()
1214                self.nvmf_proc.wait()
1215            except Exception as e:
1216                self.log_print(e)
1217                self.nvmf_proc.kill()
1218                self.nvmf_proc.communicate()
1219
1220
1221class KernelInitiator(Initiator):
1222    def __init__(self, name, general_config, initiator_config):
1223        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1224
1225        # Defaults
1226        self.extra_params = ""
1227        self.ioengine = "libaio"
1228
1229        if "extra_params" in initiator_config:
1230            self.extra_params = initiator_config["extra_params"]
1231
1232        if "kernel_engine" in initiator_config:
1233            self.ioengine = initiator_config["kernel_engine"]
1234            if "io_uring" in self.ioengine:
1235                self.extra_params = "--nr-poll-queues=8"
1236
1237    def __del__(self):
1238        self.ssh_connection.close()
1239
1240    def get_connected_nvme_list(self):
1241        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1242        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1243                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1244        return nvme_list
1245
1246    def kernel_init_connect(self):
1247        self.log_print("Below connection attempts may result in error messages, this is expected!")
1248        for subsystem in self.subsystem_info_list:
1249            self.log_print("Trying to connect %s %s %s" % subsystem)
1250            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1251                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1252            time.sleep(2)
1253
1254        if "io_uring" in self.ioengine:
1255            self.log_print("Setting block layer settings for io_uring.")
1256
1257            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1258            #       apparently it's not possible for connected subsystems.
1259            #       Results in "error: Invalid argument"
1260            block_sysfs_settings = {
1261                "iostats": "0",
1262                "rq_affinity": "0",
1263                "nomerges": "2"
1264            }
1265
1266            for disk in self.get_connected_nvme_list():
1267                sysfs = os.path.join("/sys/block", disk, "queue")
1268                for k, v in block_sysfs_settings.items():
1269                    sysfs_opt_path = os.path.join(sysfs, k)
1270                    try:
1271                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1272                    except subprocess.CalledProcessError as e:
1273                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1274                    finally:
1275                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1276                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1277
1278    def kernel_init_disconnect(self):
1279        for subsystem in self.subsystem_info_list:
1280            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1281            time.sleep(1)
1282
1283    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1284        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1285
1286        filename_section = ""
1287        nvme_per_split = int(len(nvme_list) / len(threads))
1288        remainder = len(nvme_list) % len(threads)
1289        iterator = iter(nvme_list)
1290        result = []
1291        for i in range(len(threads)):
1292            result.append([])
1293            for _ in range(nvme_per_split):
1294                result[i].append(next(iterator))
1295                if remainder:
1296                    result[i].append(next(iterator))
1297                    remainder -= 1
1298        for i, r in enumerate(result):
1299            header = "[filename%s]" % i
1300            disks = "\n".join(["filename=%s" % x for x in r])
1301            job_section_qd = round((io_depth * len(r)) / num_jobs)
1302            if job_section_qd == 0:
1303                job_section_qd = 1
1304            iodepth = "iodepth=%s" % job_section_qd
1305            filename_section = "\n".join([filename_section, header, disks, iodepth])
1306
1307        return filename_section
1308
1309
1310class SPDKInitiator(Initiator):
1311    def __init__(self, name, general_config, initiator_config):
1312        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1313
1314        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1315            self.install_spdk()
1316
1317        # Required fields
1318        self.num_cores = initiator_config["num_cores"]
1319
1320    def install_spdk(self):
1321        self.log_print("Using fio binary %s" % self.fio_bin)
1322        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1323        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1324        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1325        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1326        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1327
1328        self.log_print("SPDK built")
1329        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1330
1331    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1332        bdev_cfg_section = {
1333            "subsystems": [
1334                {
1335                    "subsystem": "bdev",
1336                    "config": []
1337                }
1338            ]
1339        }
1340
1341        for i, subsys in enumerate(remote_subsystem_list):
1342            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1343            nvme_ctrl = {
1344                "method": "bdev_nvme_attach_controller",
1345                "params": {
1346                    "name": "Nvme{}".format(i),
1347                    "trtype": self.transport,
1348                    "traddr": sub_addr,
1349                    "trsvcid": sub_port,
1350                    "subnqn": sub_nqn,
1351                    "adrfam": "IPv4"
1352                }
1353            }
1354
1355            if self.enable_adq:
1356                nvme_ctrl["params"].update({"priority": "1"})
1357
1358            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1359
1360        return json.dumps(bdev_cfg_section, indent=2)
1361
1362    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1363        filename_section = ""
1364        if len(threads) >= len(subsystems):
1365            threads = range(0, len(subsystems))
1366        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1367        nvme_per_split = int(len(subsystems) / len(threads))
1368        remainder = len(subsystems) % len(threads)
1369        iterator = iter(filenames)
1370        result = []
1371        for i in range(len(threads)):
1372            result.append([])
1373            for _ in range(nvme_per_split):
1374                result[i].append(next(iterator))
1375            if remainder:
1376                result[i].append(next(iterator))
1377                remainder -= 1
1378        for i, r in enumerate(result):
1379            header = "[filename%s]" % i
1380            disks = "\n".join(["filename=%s" % x for x in r])
1381            job_section_qd = round((io_depth * len(r)) / num_jobs)
1382            if job_section_qd == 0:
1383                job_section_qd = 1
1384            iodepth = "iodepth=%s" % job_section_qd
1385            filename_section = "\n".join([filename_section, header, disks, iodepth])
1386
1387        return filename_section
1388
1389
1390if __name__ == "__main__":
1391    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1392    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1393
1394    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1395    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1396                        help='Configuration file.')
1397    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1398                        help='Results directory.')
1399    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1400                        help='CSV results filename.')
1401
1402    args = parser.parse_args()
1403
1404    print("Using config file: %s" % args.config)
1405    with open(args.config, "r") as config:
1406        data = json.load(config)
1407
1408    initiators = []
1409    fio_cases = []
1410
1411    general_config = data["general"]
1412    target_config = data["target"]
1413    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1414
1415    for k, v in data.items():
1416        if "target" in k:
1417            v.update({"results_dir": args.results})
1418            if data[k]["mode"] == "spdk":
1419                target_obj = SPDKTarget(k, data["general"], v)
1420            elif data[k]["mode"] == "kernel":
1421                target_obj = KernelTarget(k, data["general"], v)
1422                pass
1423        elif "initiator" in k:
1424            if data[k]["mode"] == "spdk":
1425                init_obj = SPDKInitiator(k, data["general"], v)
1426            elif data[k]["mode"] == "kernel":
1427                init_obj = KernelInitiator(k, data["general"], v)
1428            initiators.append(init_obj)
1429        elif "fio" in k:
1430            fio_workloads = itertools.product(data[k]["bs"],
1431                                              data[k]["qd"],
1432                                              data[k]["rw"])
1433
1434            fio_run_time = data[k]["run_time"]
1435            fio_ramp_time = data[k]["ramp_time"]
1436            fio_rw_mix_read = data[k]["rwmixread"]
1437            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1438            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1439
1440            fio_rate_iops = 0
1441            if "rate_iops" in data[k]:
1442                fio_rate_iops = data[k]["rate_iops"]
1443        else:
1444            continue
1445
1446    try:
1447        os.mkdir(args.results)
1448    except FileExistsError:
1449        pass
1450
1451    target_obj.tgt_start()
1452
1453    for i in initiators:
1454        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1455        if i.enable_adq:
1456            i.adq_configure_tc()
1457
1458    # Poor mans threading
1459    # Run FIO tests
1460    for block_size, io_depth, rw in fio_workloads:
1461        threads = []
1462        configs = []
1463        for i in initiators:
1464            if i.mode == "kernel":
1465                i.kernel_init_connect()
1466
1467            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1468                                   fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1469            configs.append(cfg)
1470
1471        for i, cfg in zip(initiators, configs):
1472            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1473            threads.append(t)
1474        if target_obj.enable_sar:
1475            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1476            sar_file_name = ".".join([sar_file_name, "txt"])
1477            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1478            threads.append(t)
1479
1480        if target_obj.enable_pcm:
1481            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1482
1483            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1484            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1485            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1486
1487            threads.append(pcm_cpu_t)
1488            threads.append(pcm_mem_t)
1489            threads.append(pcm_pow_t)
1490
1491        if target_obj.enable_bandwidth:
1492            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1493            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1494            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1495            threads.append(t)
1496
1497        if target_obj.enable_dpdk_memory:
1498            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1499            threads.append(t)
1500
1501        for t in threads:
1502            t.start()
1503        for t in threads:
1504            t.join()
1505
1506        for i in initiators:
1507            if i.mode == "kernel":
1508                i.kernel_init_disconnect()
1509            i.copy_result_files(args.results)
1510
1511    target_obj.restore_governor()
1512    target_obj.restore_tuned()
1513    target_obj.restore_services()
1514    target_obj.restore_sysctl()
1515    for i in initiators:
1516        i.restore_governor()
1517        i.restore_tuned()
1518        i.restore_services()
1519        i.restore_sysctl()
1520    target_obj.parse_results(args.results, args.csv_filename)
1521