xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision de1df7a17dc763ac318d484f75121e312c631f42)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20
21import rpc
22import rpc.client
23from common import *
24
25
26class Server:
27    def __init__(self, name, general_config, server_config):
28        self.name = name
29        self.username = general_config["username"]
30        self.password = general_config["password"]
31        self.transport = general_config["transport"].lower()
32        self.nic_ips = server_config["nic_ips"]
33        self.mode = server_config["mode"]
34
35        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
36        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
37            self.irq_scripts_dir = server_config["irq_scripts_dir"]
38
39        self.local_nic_info = []
40        self._nics_json_obj = {}
41        self.svc_restore_dict = {}
42        self.sysctl_restore_dict = {}
43        self.tuned_restore_dict = {}
44        self.governor_restore = ""
45        self.tuned_profile = ""
46
47        self.enable_adq = False
48        self.adq_priority = None
49        if "adq_enable" in server_config and server_config["adq_enable"]:
50            self.enable_adq = server_config["adq_enable"]
51            self.adq_priority = 1
52
53        if "tuned_profile" in server_config:
54            self.tuned_profile = server_config["tuned_profile"]
55
56        if not re.match("^[A-Za-z0-9]*$", name):
57            self.log_print("Please use a name which contains only letters or numbers")
58            sys.exit(1)
59
60    def log_print(self, msg):
61        print("[%s] %s" % (self.name, msg), flush=True)
62
63    def get_uncommented_lines(self, lines):
64        return [line for line in lines if line and not line.startswith('#')]
65
66    def get_nic_name_by_ip(self, ip):
67        if not self._nics_json_obj:
68            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
69            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
70        for nic in self._nics_json_obj:
71            for addr in nic["addr_info"]:
72                if ip in addr["local"]:
73                    return nic["ifname"]
74
75    def set_local_nic_info_helper(self):
76        pass
77
78    def set_local_nic_info(self, pci_info):
79        def extract_network_elements(json_obj):
80            nic_list = []
81            if isinstance(json_obj, list):
82                for x in json_obj:
83                    nic_list.extend(extract_network_elements(x))
84            elif isinstance(json_obj, dict):
85                if "children" in json_obj:
86                    nic_list.extend(extract_network_elements(json_obj["children"]))
87                if "class" in json_obj.keys() and "network" in json_obj["class"]:
88                    nic_list.append(json_obj)
89            return nic_list
90
91        self.local_nic_info = extract_network_elements(pci_info)
92
93    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
94        return ""
95
96    def configure_system(self):
97        self.configure_services()
98        self.configure_sysctl()
99        self.configure_tuned()
100        self.configure_cpu_governor()
101        self.configure_irq_affinity()
102
103    def configure_adq(self):
104        if self.mode == "kernel":
105            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
106            return
107        self.adq_load_modules()
108        self.adq_configure_nic()
109
110    def adq_load_modules(self):
111        self.log_print("Modprobing ADQ-related Linux modules...")
112        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
113        for module in adq_module_deps:
114            try:
115                self.exec_cmd(["sudo", "modprobe", module])
116                self.log_print("%s loaded!" % module)
117            except CalledProcessError as e:
118                self.log_print("ERROR: failed to load module %s" % module)
119                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
120
121    def adq_configure_tc(self):
122        self.log_print("Configuring ADQ Traffic classess and filters...")
123
124        if self.mode == "kernel":
125            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
126            return
127
128        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
129        num_queues_tc1 = self.num_cores
130        port_param = "dst_port" if isinstance(self, Target) else "src_port"
131        ports = set([p[0] for p in self.subsystem_info_list])
132        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
133
134        for nic_ip in self.nic_ips:
135            nic_name = self.get_nic_name_by_ip(nic_ip)
136            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
137                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
138                                "queues", "%s@0" % num_queues_tc0,
139                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
140                                "hw", "1", "mode", "channel"]
141            self.log_print(" ".join(tc_qdisc_map_cmd))
142            self.exec_cmd(tc_qdisc_map_cmd)
143
144            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
145            self.log_print(" ".join(tc_qdisc_ingress_cmd))
146            self.exec_cmd(tc_qdisc_ingress_cmd)
147
148            for port in ports:
149                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
150                                 "protocol", "ip", "ingress", "prio", "1", "flower",
151                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
152                                 "skip_sw", "hw_tc", "1"]
153                self.log_print(" ".join(tc_filter_cmd))
154                self.exec_cmd(tc_filter_cmd)
155
156            # Ethtool coalese settings must be applied after configuring traffic classes
157            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
158            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
159
160            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
161            xps_cmd = ["sudo", xps_script_path, nic_name]
162            self.log_print(xps_cmd)
163            self.exec_cmd(xps_cmd)
164
165    def adq_configure_nic(self):
166        self.log_print("Configuring NIC port settings for ADQ testing...")
167
168        # Reload the driver first, to make sure any previous settings are re-set.
169        try:
170            self.exec_cmd(["sudo", "rmmod", "ice"])
171            self.exec_cmd(["sudo", "modprobe", "ice"])
172        except CalledProcessError as e:
173            self.log_print("ERROR: failed to reload ice module!")
174            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
175
176        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
177        for nic in nic_names:
178            self.log_print(nic)
179            try:
180                self.exec_cmd(["sudo", "ethtool", "-K", nic,
181                               "hw-tc-offload", "on"])  # Enable hardware TC offload
182                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
183                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
184                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
185                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
186                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
187            except CalledProcessError as e:
188                self.log_print("ERROR: failed to configure NIC port using ethtool!")
189                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
190                self.log_print("Please update your NIC driver and firmware versions and try again.")
191            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
192            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
193
194    def configure_services(self):
195        self.log_print("Configuring active services...")
196        svc_config = configparser.ConfigParser(strict=False)
197
198        # Below list is valid only for RHEL / Fedora systems and might not
199        # contain valid names for other distributions.
200        svc_target_state = {
201            "firewalld": "inactive",
202            "irqbalance": "inactive",
203            "lldpad.service": "inactive",
204            "lldpad.socket": "inactive"
205        }
206
207        for service in svc_target_state:
208            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
209            out = "\n".join(["[%s]" % service, out])
210            svc_config.read_string(out)
211
212            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
213                continue
214
215            service_state = svc_config[service]["ActiveState"]
216            self.log_print("Current state of %s service is %s" % (service, service_state))
217            self.svc_restore_dict.update({service: service_state})
218            if service_state != "inactive":
219                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
220                self.exec_cmd(["sudo", "systemctl", "stop", service])
221
222    def configure_sysctl(self):
223        self.log_print("Tuning sysctl settings...")
224
225        busy_read = 0
226        if self.enable_adq and self.mode == "spdk":
227            busy_read = 1
228
229        sysctl_opts = {
230            "net.core.busy_poll": 0,
231            "net.core.busy_read": busy_read,
232            "net.core.somaxconn": 4096,
233            "net.core.netdev_max_backlog": 8192,
234            "net.ipv4.tcp_max_syn_backlog": 16384,
235            "net.core.rmem_max": 268435456,
236            "net.core.wmem_max": 268435456,
237            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
238            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
239            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
240            "net.ipv4.route.flush": 1,
241            "vm.overcommit_memory": 1,
242        }
243
244        for opt, value in sysctl_opts.items():
245            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
246            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
247
248    def configure_tuned(self):
249        if not self.tuned_profile:
250            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
251            return
252
253        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
254        service = "tuned"
255        tuned_config = configparser.ConfigParser(strict=False)
256
257        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
258        out = "\n".join(["[%s]" % service, out])
259        tuned_config.read_string(out)
260        tuned_state = tuned_config[service]["ActiveState"]
261        self.svc_restore_dict.update({service: tuned_state})
262
263        if tuned_state != "inactive":
264            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
265            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
266
267            self.tuned_restore_dict = {
268                "profile": profile,
269                "mode": profile_mode
270            }
271
272        self.exec_cmd(["sudo", "systemctl", "start", service])
273        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
274        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
275
276    def configure_cpu_governor(self):
277        self.log_print("Setting CPU governor to performance...")
278
279        # This assumes that there is the same CPU scaling governor on each CPU
280        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
281        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
282
283    def configure_irq_affinity(self):
284        self.log_print("Setting NIC irq affinity for NICs...")
285
286        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
287        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
288        for nic in nic_names:
289            irq_cmd = ["sudo", irq_script_path, nic]
290            self.log_print(irq_cmd)
291            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
292
293    def restore_services(self):
294        self.log_print("Restoring services...")
295        for service, state in self.svc_restore_dict.items():
296            cmd = "stop" if state == "inactive" else "start"
297            self.exec_cmd(["sudo", "systemctl", cmd, service])
298
299    def restore_sysctl(self):
300        self.log_print("Restoring sysctl settings...")
301        for opt, value in self.sysctl_restore_dict.items():
302            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
303
304    def restore_tuned(self):
305        self.log_print("Restoring tuned-adm settings...")
306
307        if not self.tuned_restore_dict:
308            return
309
310        if self.tuned_restore_dict["mode"] == "auto":
311            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
312            self.log_print("Reverted tuned-adm to auto_profile.")
313        else:
314            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
315            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
316
317    def restore_governor(self):
318        self.log_print("Restoring CPU governor setting...")
319        if self.governor_restore:
320            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
321            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
322
323
324class Target(Server):
325    def __init__(self, name, general_config, target_config):
326        super(Target, self).__init__(name, general_config, target_config)
327
328        # Defaults
329        self.enable_sar = False
330        self.sar_delay = 0
331        self.sar_interval = 0
332        self.sar_count = 0
333        self.enable_pcm = False
334        self.pcm_dir = ""
335        self.pcm_delay = 0
336        self.pcm_interval = 0
337        self.pcm_count = 0
338        self.enable_bandwidth = 0
339        self.bandwidth_count = 0
340        self.enable_dpdk_memory = False
341        self.dpdk_wait_time = 0
342        self.enable_zcopy = False
343        self.scheduler_name = "static"
344        self.null_block = 0
345        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
346        self.subsystem_info_list = []
347
348        if "null_block_devices" in target_config:
349            self.null_block = target_config["null_block_devices"]
350        if "sar_settings" in target_config:
351            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
352        if "pcm_settings" in target_config:
353            self.enable_pcm = True
354            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
355        if "enable_bandwidth" in target_config:
356            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
357        if "enable_dpdk_memory" in target_config:
358            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
359        if "scheduler_settings" in target_config:
360            self.scheduler_name = target_config["scheduler_settings"]
361        if "zcopy_settings" in target_config:
362            self.enable_zcopy = target_config["zcopy_settings"]
363        if "results_dir" in target_config:
364            self.results_dir = target_config["results_dir"]
365
366        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
367        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
368        self.set_local_nic_info(self.set_local_nic_info_helper())
369
370        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
371            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
372
373        self.configure_system()
374        if self.enable_adq:
375            self.configure_adq()
376        self.sys_config()
377
378    def set_local_nic_info_helper(self):
379        return json.loads(self.exec_cmd(["lshw", "-json"]))
380
381    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
382        stderr_opt = None
383        if stderr_redirect:
384            stderr_opt = subprocess.STDOUT
385        if change_dir:
386            old_cwd = os.getcwd()
387            os.chdir(change_dir)
388            self.log_print("Changing directory to %s" % change_dir)
389
390        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
391
392        if change_dir:
393            os.chdir(old_cwd)
394            self.log_print("Changing directory to %s" % old_cwd)
395        return out
396
397    def zip_spdk_sources(self, spdk_dir, dest_file):
398        self.log_print("Zipping SPDK source directory")
399        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
400        for root, directories, files in os.walk(spdk_dir, followlinks=True):
401            for file in files:
402                fh.write(os.path.relpath(os.path.join(root, file)))
403        fh.close()
404        self.log_print("Done zipping")
405
406    def read_json_stats(self, file):
407        with open(file, "r") as json_data:
408            data = json.load(json_data)
409            job_pos = 0  # job_post = 0 because using aggregated results
410
411            # Check if latency is in nano or microseconds to choose correct dict key
412            def get_lat_unit(key_prefix, dict_section):
413                # key prefix - lat, clat or slat.
414                # dict section - portion of json containing latency bucket in question
415                # Return dict key to access the bucket and unit as string
416                for k, _ in dict_section.items():
417                    if k.startswith(key_prefix):
418                        return k, k.split("_")[1]
419
420            def get_clat_percentiles(clat_dict_leaf):
421                if "percentile" in clat_dict_leaf:
422                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
423                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
424                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
425                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
426
427                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
428                else:
429                    # Latest fio versions do not provide "percentile" results if no
430                    # measurements were done, so just return zeroes
431                    return [0, 0, 0, 0]
432
433            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
434            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
435            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
436            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
437            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
438            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
439            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
440            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
441                data["jobs"][job_pos]["read"][clat_key])
442
443            if "ns" in lat_unit:
444                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
445            if "ns" in clat_unit:
446                read_p99_lat = read_p99_lat / 1000
447                read_p99_9_lat = read_p99_9_lat / 1000
448                read_p99_99_lat = read_p99_99_lat / 1000
449                read_p99_999_lat = read_p99_999_lat / 1000
450
451            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
452            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
453            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
454            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
455            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
456            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
457            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
458            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
459                data["jobs"][job_pos]["write"][clat_key])
460
461            if "ns" in lat_unit:
462                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
463            if "ns" in clat_unit:
464                write_p99_lat = write_p99_lat / 1000
465                write_p99_9_lat = write_p99_9_lat / 1000
466                write_p99_99_lat = write_p99_99_lat / 1000
467                write_p99_999_lat = write_p99_999_lat / 1000
468
469        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
470                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
471                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
472                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
473
474    def parse_results(self, results_dir, csv_file):
475        files = os.listdir(results_dir)
476        fio_files = filter(lambda x: ".fio" in x, files)
477        json_files = [x for x in files if ".json" in x]
478
479        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
480                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
481                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
482                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
483
484        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
485                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
486
487        header_line = ",".join(["Name", *headers])
488        aggr_header_line = ",".join(["Name", *aggr_headers])
489
490        # Create empty results file
491        with open(os.path.join(results_dir, csv_file), "w") as fh:
492            fh.write(aggr_header_line + "\n")
493        rows = set()
494
495        for fio_config in fio_files:
496            self.log_print("Getting FIO stats for %s" % fio_config)
497            job_name, _ = os.path.splitext(fio_config)
498
499            # Look in the filename for rwmixread value. Function arguments do
500            # not have that information.
501            # TODO: Improve this function by directly using workload params instead
502            # of regexing through filenames.
503            if "read" in job_name:
504                rw_mixread = 1
505            elif "write" in job_name:
506                rw_mixread = 0
507            else:
508                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
509
510            # If "_CPU" exists in name - ignore it
511            # Initiators for the same job could have diffrent num_cores parameter
512            job_name = re.sub(r"_\d+CPU", "", job_name)
513            job_result_files = [x for x in json_files if job_name in x]
514            self.log_print("Matching result files for current fio config:")
515            for j in job_result_files:
516                self.log_print("\t %s" % j)
517
518            # There may have been more than 1 initiator used in test, need to check that
519            # Result files are created so that string after last "_" separator is server name
520            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
521            inits_avg_results = []
522            for i in inits_names:
523                self.log_print("\tGetting stats for initiator %s" % i)
524                # There may have been more than 1 test run for this job, calculate average results for initiator
525                i_results = [x for x in job_result_files if i in x]
526                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
527
528                separate_stats = []
529                for r in i_results:
530                    try:
531                        stats = self.read_json_stats(os.path.join(results_dir, r))
532                        separate_stats.append(stats)
533                        self.log_print(stats)
534                    except JSONDecodeError as e:
535                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!")
536
537                init_results = [sum(x) for x in zip(*separate_stats)]
538                init_results = [x / len(separate_stats) for x in init_results]
539                inits_avg_results.append(init_results)
540
541                self.log_print("\tAverage results for initiator %s" % i)
542                self.log_print(init_results)
543                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
544                    fh.write(header_line + "\n")
545                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
546
547            # Sum results of all initiators running this FIO job.
548            # Latency results are an average of latencies from accros all initiators.
549            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
550            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
551            for key in inits_avg_results:
552                if "lat" in key:
553                    inits_avg_results[key] /= len(inits_names)
554
555            # Aggregate separate read/write values into common labels
556            # Take rw_mixread into consideration for mixed read/write workloads.
557            aggregate_results = OrderedDict()
558            for h in aggr_headers:
559                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
560                if "lat" in h:
561                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
562                else:
563                    _ = read_stat + write_stat
564                aggregate_results[h] = "{0:.3f}".format(_)
565
566            rows.add(",".join([job_name, *aggregate_results.values()]))
567
568        # Save results to file
569        for row in rows:
570            with open(os.path.join(results_dir, csv_file), "a") as fh:
571                fh.write(row + "\n")
572        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
573
574    def measure_sar(self, results_dir, sar_file_name):
575        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
576        time.sleep(self.sar_delay)
577        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
578        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
579            for line in out.split("\n"):
580                if "Average" in line and "CPU" in line:
581                    self.log_print("Summary CPU utilization from SAR:")
582                    self.log_print(line)
583                if "Average" in line and "all" in line:
584                    self.log_print(line)
585            fh.write(out)
586
587    def measure_pcm_memory(self, results_dir, pcm_file_name):
588        time.sleep(self.pcm_delay)
589        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
590        pcm_memory = subprocess.Popen(cmd)
591        time.sleep(self.pcm_count)
592        pcm_memory.terminate()
593
594    def measure_pcm(self, results_dir, pcm_file_name):
595        time.sleep(self.pcm_delay)
596        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
597        subprocess.run(cmd)
598        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
599        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
600        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
601        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
602        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
603
604    def measure_pcm_power(self, results_dir, pcm_power_file_name):
605        time.sleep(self.pcm_delay)
606        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
607        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
608            fh.write(out)
609
610    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
611        self.log_print("INFO: starting network bandwidth measure")
612        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
613                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
614
615    def measure_dpdk_memory(self, results_dir):
616        self.log_print("INFO: waiting to generate DPDK memory usage")
617        time.sleep(self.dpdk_wait_time)
618        self.log_print("INFO: generating DPDK memory usage")
619        rpc.env.env_dpdk_get_mem_stats
620        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
621
622    def sys_config(self):
623        self.log_print("====Kernel release:====")
624        self.log_print(os.uname().release)
625        self.log_print("====Kernel command line:====")
626        with open('/proc/cmdline') as f:
627            cmdline = f.readlines()
628            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
629        self.log_print("====sysctl conf:====")
630        with open('/etc/sysctl.conf') as f:
631            sysctl = f.readlines()
632            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
633        self.log_print("====Cpu power info:====")
634        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
635        self.log_print("====zcopy settings:====")
636        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
637        self.log_print("====Scheduler settings:====")
638        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
639
640
641class Initiator(Server):
642    def __init__(self, name, general_config, initiator_config):
643        super(Initiator, self).__init__(name, general_config, initiator_config)
644
645        # Required fields
646        self.ip = initiator_config["ip"]
647        self.target_nic_ips = initiator_config["target_nic_ips"]
648
649        # Defaults
650        self.cpus_allowed = None
651        self.cpus_allowed_policy = "shared"
652        self.spdk_dir = "/tmp/spdk"
653        self.fio_bin = "/usr/src/fio/fio"
654        self.nvmecli_bin = "nvme"
655        self.cpu_frequency = None
656        self.subsystem_info_list = []
657
658        if "spdk_dir" in initiator_config:
659            self.spdk_dir = initiator_config["spdk_dir"]
660        if "fio_bin" in initiator_config:
661            self.fio_bin = initiator_config["fio_bin"]
662        if "nvmecli_bin" in initiator_config:
663            self.nvmecli_bin = initiator_config["nvmecli_bin"]
664        if "cpus_allowed" in initiator_config:
665            self.cpus_allowed = initiator_config["cpus_allowed"]
666        if "cpus_allowed_policy" in initiator_config:
667            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
668        if "cpu_frequency" in initiator_config:
669            self.cpu_frequency = initiator_config["cpu_frequency"]
670        if os.getenv('SPDK_WORKSPACE'):
671            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
672
673        self.ssh_connection = paramiko.SSHClient()
674        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
675        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
676        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
677        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
678        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
679
680        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
681            self.copy_spdk("/tmp/spdk.zip")
682        self.set_local_nic_info(self.set_local_nic_info_helper())
683        self.set_cpu_frequency()
684        self.configure_system()
685        if self.enable_adq:
686            self.configure_adq()
687        self.sys_config()
688
689    def set_local_nic_info_helper(self):
690        return json.loads(self.exec_cmd(["lshw", "-json"]))
691
692    def __del__(self):
693        self.ssh_connection.close()
694
695    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
696        if change_dir:
697            cmd = ["cd", change_dir, ";", *cmd]
698
699        # In case one of the command elements contains whitespace and is not
700        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
701        # when sending to remote system.
702        for i, c in enumerate(cmd):
703            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
704                cmd[i] = '"%s"' % c
705        cmd = " ".join(cmd)
706
707        # Redirect stderr to stdout thanks using get_pty option if needed
708        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
709        out = stdout.read().decode(encoding="utf-8")
710
711        # Check the return code
712        rc = stdout.channel.recv_exit_status()
713        if rc:
714            raise CalledProcessError(int(rc), cmd, out)
715
716        return out
717
718    def put_file(self, local, remote_dest):
719        ftp = self.ssh_connection.open_sftp()
720        ftp.put(local, remote_dest)
721        ftp.close()
722
723    def get_file(self, remote, local_dest):
724        ftp = self.ssh_connection.open_sftp()
725        ftp.get(remote, local_dest)
726        ftp.close()
727
728    def copy_spdk(self, local_spdk_zip):
729        self.log_print("Copying SPDK sources to initiator %s" % self.name)
730        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
731        self.log_print("Copied sources zip from target")
732        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
733        self.log_print("Sources unpacked")
734
735    def copy_result_files(self, dest_dir):
736        self.log_print("Copying results")
737
738        if not os.path.exists(dest_dir):
739            os.mkdir(dest_dir)
740
741        # Get list of result files from initiator and copy them back to target
742        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
743
744        for file in file_list:
745            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
746                          os.path.join(dest_dir, file))
747        self.log_print("Done copying results")
748
749    def discover_subsystems(self, address_list, subsys_no):
750        num_nvmes = range(0, subsys_no)
751        nvme_discover_output = ""
752        for ip, subsys_no in itertools.product(address_list, num_nvmes):
753            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
754            nvme_discover_cmd = ["sudo",
755                                 "%s" % self.nvmecli_bin,
756                                 "discover", "-t", "%s" % self.transport,
757                                 "-s", "%s" % (4420 + subsys_no),
758                                 "-a", "%s" % ip]
759
760            try:
761                stdout = self.exec_cmd(nvme_discover_cmd)
762                if stdout:
763                    nvme_discover_output = nvme_discover_output + stdout
764            except CalledProcessError:
765                # Do nothing. In case of discovering remote subsystems of kernel target
766                # we expect "nvme discover" to fail a bunch of times because we basically
767                # scan ports.
768                pass
769
770        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
771                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
772                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
773                                nvme_discover_output)  # from nvme discovery output
774        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
775        subsystems = list(set(subsystems))
776        subsystems.sort(key=lambda x: x[1])
777        self.log_print("Found matching subsystems on target side:")
778        for s in subsystems:
779            self.log_print(s)
780        self.subsystem_info_list = subsystems
781
782    def gen_fio_filename_conf(self, *args, **kwargs):
783        # Logic implemented in SPDKInitiator and KernelInitiator classes
784        pass
785
786    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
787        fio_conf_template = """
788[global]
789ioengine={ioengine}
790{spdk_conf}
791thread=1
792group_reporting=1
793direct=1
794percentile_list=50:90:99:99.5:99.9:99.99:99.999
795
796norandommap=1
797rw={rw}
798rwmixread={rwmixread}
799bs={block_size}
800time_based=1
801ramp_time={ramp_time}
802runtime={run_time}
803rate_iops={rate_iops}
804"""
805        if "spdk" in self.mode:
806            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
807            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
808            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
809            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
810        else:
811            ioengine = self.ioengine
812            spdk_conf = ""
813            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
814                                 "|", "awk", "'{print $1}'"])
815            subsystems = [x for x in out.split("\n") if "nvme" in x]
816
817        if self.cpus_allowed is not None:
818            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
819            cpus_num = 0
820            cpus = self.cpus_allowed.split(",")
821            for cpu in cpus:
822                if "-" in cpu:
823                    a, b = cpu.split("-")
824                    a = int(a)
825                    b = int(b)
826                    cpus_num += len(range(a, b))
827                else:
828                    cpus_num += 1
829            self.num_cores = cpus_num
830            threads = range(0, self.num_cores)
831        elif hasattr(self, 'num_cores'):
832            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
833            threads = range(0, int(self.num_cores))
834        else:
835            self.num_cores = len(subsystems)
836            threads = range(0, len(subsystems))
837
838        if "spdk" in self.mode:
839            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
840        else:
841            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
842
843        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
844                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
845                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
846
847        # TODO: hipri disabled for now, as it causes fio errors:
848        # io_u error on file /dev/nvme2n1: Operation not supported
849        # See comment in KernelInitiator class, kernel_init_connect() function
850        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
851            fio_config = fio_config + """
852fixedbufs=1
853registerfiles=1
854#hipri=1
855"""
856        if num_jobs:
857            fio_config = fio_config + "numjobs=%s \n" % num_jobs
858        if self.cpus_allowed is not None:
859            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
860            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
861        fio_config = fio_config + filename_section
862
863        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
864        if hasattr(self, "num_cores"):
865            fio_config_filename += "_%sCPU" % self.num_cores
866        fio_config_filename += ".fio"
867
868        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
869        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
870        self.log_print("Created FIO Config:")
871        self.log_print(fio_config)
872
873        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
874
875    def set_cpu_frequency(self):
876        if self.cpu_frequency is not None:
877            try:
878                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
879                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
880                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
881            except Exception:
882                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
883                sys.exit()
884        else:
885            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
886
887    def run_fio(self, fio_config_file, run_num=None):
888        job_name, _ = os.path.splitext(fio_config_file)
889        self.log_print("Starting FIO run for job: %s" % job_name)
890        self.log_print("Using FIO: %s" % self.fio_bin)
891
892        if run_num:
893            for i in range(1, run_num + 1):
894                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
895                try:
896                    output = self.exec_cmd(["sudo", self.fio_bin,
897                                            fio_config_file, "--output-format=json",
898                                            "--output=%s" % output_filename], True)
899                    self.log_print(output)
900                except subprocess.CalledProcessError as e:
901                    self.log_print("ERROR: Fio process failed!")
902                    self.log_print(e.stdout)
903        else:
904            output_filename = job_name + "_" + self.name + ".json"
905            output = self.exec_cmd(["sudo", self.fio_bin,
906                                    fio_config_file, "--output-format=json",
907                                    "--output" % output_filename], True)
908            self.log_print(output)
909        self.log_print("FIO run finished. Results in: %s" % output_filename)
910
911    def sys_config(self):
912        self.log_print("====Kernel release:====")
913        self.log_print(self.exec_cmd(["uname", "-r"]))
914        self.log_print("====Kernel command line:====")
915        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
916        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
917        self.log_print("====sysctl conf:====")
918        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
919        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
920        self.log_print("====Cpu power info:====")
921        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
922
923
924class KernelTarget(Target):
925    def __init__(self, name, general_config, target_config):
926        super(KernelTarget, self).__init__(name, general_config, target_config)
927        # Defaults
928        self.nvmet_bin = "nvmetcli"
929
930        if "nvmet_bin" in target_config:
931            self.nvmet_bin = target_config["nvmet_bin"]
932
933    def __del__(self):
934        nvmet_command(self.nvmet_bin, "clear")
935
936    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
937
938        nvmet_cfg = {
939            "ports": [],
940            "hosts": [],
941            "subsystems": [],
942        }
943
944        # Split disks between NIC IP's
945        disks_per_ip = int(len(nvme_list) / len(address_list))
946        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
947
948        subsys_no = 1
949        port_no = 0
950        for ip, chunk in zip(address_list, disk_chunks):
951            for disk in chunk:
952                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
953                nvmet_cfg["subsystems"].append({
954                    "allowed_hosts": [],
955                    "attr": {
956                        "allow_any_host": "1",
957                        "serial": "SPDK00%s" % subsys_no,
958                        "version": "1.3"
959                    },
960                    "namespaces": [
961                        {
962                            "device": {
963                                "path": disk,
964                                "uuid": "%s" % uuid.uuid4()
965                            },
966                            "enable": 1,
967                            "nsid": subsys_no
968                        }
969                    ],
970                    "nqn": nqn
971                })
972
973                nvmet_cfg["ports"].append({
974                    "addr": {
975                        "adrfam": "ipv4",
976                        "traddr": ip,
977                        "trsvcid": "%s" % (4420 + port_no),
978                        "trtype": "%s" % self.transport
979                    },
980                    "portid": subsys_no,
981                    "referrals": [],
982                    "subsystems": [nqn]
983                })
984                subsys_no += 1
985                port_no += 1
986                self.subsystem_info_list.append([port_no, nqn, ip])
987
988        with open("kernel.conf", "w") as fh:
989            fh.write(json.dumps(nvmet_cfg, indent=2))
990        pass
991
992    def tgt_start(self):
993        self.log_print("Configuring kernel NVMeOF Target")
994
995        if self.null_block:
996            print("Configuring with null block device.")
997            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
998            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
999            self.subsys_no = len(null_blk_list)
1000        else:
1001            print("Configuring with NVMe drives.")
1002            nvme_list = get_nvme_devices()
1003            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1004            self.subsys_no = len(nvme_list)
1005
1006        nvmet_command(self.nvmet_bin, "clear")
1007        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1008
1009        if self.enable_adq:
1010            self.adq_configure_tc()
1011
1012        self.log_print("Done configuring kernel NVMeOF Target")
1013
1014
1015class SPDKTarget(Target):
1016    def __init__(self, name, general_config, target_config):
1017        super(SPDKTarget, self).__init__(name, general_config, target_config)
1018
1019        # Required fields
1020        self.core_mask = target_config["core_mask"]
1021        self.num_cores = self.get_num_cores(self.core_mask)
1022
1023        # Defaults
1024        self.dif_insert_strip = False
1025        self.null_block_dif_type = 0
1026        self.num_shared_buffers = 4096
1027        self.bpf_proc = None
1028        self.bpf_scripts = []
1029
1030        if "num_shared_buffers" in target_config:
1031            self.num_shared_buffers = target_config["num_shared_buffers"]
1032        if "null_block_dif_type" in target_config:
1033            self.null_block_dif_type = target_config["null_block_dif_type"]
1034        if "dif_insert_strip" in target_config:
1035            self.dif_insert_strip = target_config["dif_insert_strip"]
1036        if "bpf_scripts" in target_config:
1037            self.bpf_scripts = target_config["bpf_scripts"]
1038
1039    def get_num_cores(self, core_mask):
1040        if "0x" in core_mask:
1041            return bin(int(core_mask, 16)).count("1")
1042        else:
1043            num_cores = 0
1044            core_mask = core_mask.replace("[", "")
1045            core_mask = core_mask.replace("]", "")
1046            for i in core_mask.split(","):
1047                if "-" in i:
1048                    x, y = i.split("-")
1049                    num_cores += len(range(int(x), int(y))) + 1
1050                else:
1051                    num_cores += 1
1052            return num_cores
1053
1054    def spdk_tgt_configure(self):
1055        self.log_print("Configuring SPDK NVMeOF target via RPC")
1056
1057        # Create RDMA transport layer
1058        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1059                                       num_shared_buffers=self.num_shared_buffers,
1060                                       dif_insert_or_strip=self.dif_insert_strip,
1061                                       sock_priority=self.adq_priority)
1062        self.log_print("SPDK NVMeOF transport layer:")
1063        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1064
1065        if self.enable_adq:
1066            self.adq_configure_tc()
1067            self.log_print("Done configuring SPDK NVMeOF Target")
1068
1069        if self.null_block:
1070            self.spdk_tgt_add_nullblock(self.null_block)
1071            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1072        else:
1073            self.spdk_tgt_add_nvme_conf()
1074            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1075
1076    def spdk_tgt_add_nullblock(self, null_block_count):
1077        md_size = 0
1078        block_size = 4096
1079        if self.null_block_dif_type != 0:
1080            md_size = 128
1081
1082        self.log_print("Adding null block bdevices to config via RPC")
1083        for i in range(null_block_count):
1084            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1085            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1086                                      dif_type=self.null_block_dif_type, md_size=md_size)
1087        self.log_print("SPDK Bdevs configuration:")
1088        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1089
1090    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1091        self.log_print("Adding NVMe bdevs to config via RPC")
1092
1093        bdfs = get_nvme_devices_bdf()
1094        bdfs = [b.replace(":", ".") for b in bdfs]
1095
1096        if req_num_disks:
1097            if req_num_disks > len(bdfs):
1098                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1099                sys.exit(1)
1100            else:
1101                bdfs = bdfs[0:req_num_disks]
1102
1103        for i, bdf in enumerate(bdfs):
1104            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1105
1106        self.log_print("SPDK Bdevs configuration:")
1107        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1108
1109    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1110        self.log_print("Adding subsystems to config")
1111        port = "4420"
1112        if not req_num_disks:
1113            req_num_disks = get_nvme_devices_count()
1114
1115        # Distribute bdevs between provided NICs
1116        num_disks = range(0, req_num_disks)
1117        if len(num_disks) == 1:
1118            disks_per_ip = 1
1119        else:
1120            disks_per_ip = int(len(num_disks) / len(ips))
1121        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1122
1123        # Create subsystems, add bdevs to namespaces, add listeners
1124        for ip, chunk in zip(ips, disk_chunks):
1125            for c in chunk:
1126                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1127                serial = "SPDK00%s" % c
1128                bdev_name = "Nvme%sn1" % c
1129                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1130                                               allow_any_host=True, max_namespaces=8)
1131                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1132
1133                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1134                                                     nqn=nqn,
1135                                                     trtype=self.transport,
1136                                                     traddr=ip,
1137                                                     trsvcid=port,
1138                                                     adrfam="ipv4")
1139
1140                self.subsystem_info_list.append([port, nqn, ip])
1141        self.log_print("SPDK NVMeOF subsystem configuration:")
1142        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1143
1144    def bpf_start(self):
1145        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1146        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1147        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1148        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1149
1150        with open(self.pid, "r") as fh:
1151            nvmf_pid = str(fh.readline())
1152
1153        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1154        self.log_print(cmd)
1155        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1156
1157    def tgt_start(self):
1158        if self.null_block:
1159            self.subsys_no = 1
1160        else:
1161            self.subsys_no = get_nvme_devices_count()
1162        self.log_print("Starting SPDK NVMeOF Target process")
1163        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1164        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1165        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1166
1167        with open(self.pid, "w") as fh:
1168            fh.write(str(proc.pid))
1169        self.nvmf_proc = proc
1170        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1171        self.log_print("Waiting for spdk to initilize...")
1172        while True:
1173            if os.path.exists("/var/tmp/spdk.sock"):
1174                break
1175            time.sleep(1)
1176        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1177
1178        if self.enable_zcopy:
1179            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1180                                           enable_zerocopy_send_server=True)
1181            self.log_print("Target socket options:")
1182            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1183
1184        if self.enable_adq:
1185            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1186            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1187                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1188            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1189
1190        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1191
1192        rpc.framework_start_init(self.client)
1193
1194        if self.bpf_scripts:
1195            self.bpf_start()
1196
1197        self.spdk_tgt_configure()
1198
1199    def __del__(self):
1200        if self.bpf_proc:
1201            self.log_print("Stopping BPF Trace script")
1202            self.bpf_proc.terminate()
1203            self.bpf_proc.wait()
1204
1205        if hasattr(self, "nvmf_proc"):
1206            try:
1207                self.nvmf_proc.terminate()
1208                self.nvmf_proc.wait()
1209            except Exception as e:
1210                self.log_print(e)
1211                self.nvmf_proc.kill()
1212                self.nvmf_proc.communicate()
1213
1214
1215class KernelInitiator(Initiator):
1216    def __init__(self, name, general_config, initiator_config):
1217        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1218
1219        # Defaults
1220        self.extra_params = ""
1221        self.ioengine = "libaio"
1222
1223        if "extra_params" in initiator_config:
1224            self.extra_params = initiator_config["extra_params"]
1225
1226        if "kernel_engine" in initiator_config:
1227            self.ioengine = initiator_config["kernel_engine"]
1228            if "io_uring" in self.ioengine:
1229                self.extra_params = "--nr-poll-queues=8"
1230
1231    def __del__(self):
1232        self.ssh_connection.close()
1233
1234    def get_connected_nvme_list(self):
1235        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1236        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1237                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1238        return nvme_list
1239
1240    def kernel_init_connect(self):
1241        self.log_print("Below connection attempts may result in error messages, this is expected!")
1242        for subsystem in self.subsystem_info_list:
1243            self.log_print("Trying to connect %s %s %s" % subsystem)
1244            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1245                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1246            time.sleep(2)
1247
1248        if "io_uring" in self.ioengine:
1249            self.log_print("Setting block layer settings for io_uring.")
1250
1251            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1252            #       apparently it's not possible for connected subsystems.
1253            #       Results in "error: Invalid argument"
1254            block_sysfs_settings = {
1255                "iostats": "0",
1256                "rq_affinity": "0",
1257                "nomerges": "2"
1258            }
1259
1260            for disk in self.get_connected_nvme_list():
1261                sysfs = os.path.join("/sys/block", disk, "queue")
1262                for k, v in block_sysfs_settings.items():
1263                    sysfs_opt_path = os.path.join(sysfs, k)
1264                    try:
1265                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1266                    except subprocess.CalledProcessError as e:
1267                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1268                    finally:
1269                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1270                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1271
1272    def kernel_init_disconnect(self):
1273        for subsystem in self.subsystem_info_list:
1274            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1275            time.sleep(1)
1276
1277    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1278        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1279
1280        filename_section = ""
1281        nvme_per_split = int(len(nvme_list) / len(threads))
1282        remainder = len(nvme_list) % len(threads)
1283        iterator = iter(nvme_list)
1284        result = []
1285        for i in range(len(threads)):
1286            result.append([])
1287            for _ in range(nvme_per_split):
1288                result[i].append(next(iterator))
1289                if remainder:
1290                    result[i].append(next(iterator))
1291                    remainder -= 1
1292        for i, r in enumerate(result):
1293            header = "[filename%s]" % i
1294            disks = "\n".join(["filename=%s" % x for x in r])
1295            job_section_qd = round((io_depth * len(r)) / num_jobs)
1296            if job_section_qd == 0:
1297                job_section_qd = 1
1298            iodepth = "iodepth=%s" % job_section_qd
1299            filename_section = "\n".join([filename_section, header, disks, iodepth])
1300
1301        return filename_section
1302
1303
1304class SPDKInitiator(Initiator):
1305    def __init__(self, name, general_config, initiator_config):
1306        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1307
1308        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1309            self.install_spdk()
1310
1311        # Required fields
1312        self.num_cores = initiator_config["num_cores"]
1313
1314    def install_spdk(self):
1315        self.log_print("Using fio binary %s" % self.fio_bin)
1316        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1317        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1318        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1319        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1320        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1321
1322        self.log_print("SPDK built")
1323        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1324
1325    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1326        bdev_cfg_section = {
1327            "subsystems": [
1328                {
1329                    "subsystem": "bdev",
1330                    "config": []
1331                }
1332            ]
1333        }
1334
1335        for i, subsys in enumerate(remote_subsystem_list):
1336            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1337            nvme_ctrl = {
1338                "method": "bdev_nvme_attach_controller",
1339                "params": {
1340                    "name": "Nvme{}".format(i),
1341                    "trtype": self.transport,
1342                    "traddr": sub_addr,
1343                    "trsvcid": sub_port,
1344                    "subnqn": sub_nqn,
1345                    "adrfam": "IPv4"
1346                }
1347            }
1348
1349            if self.enable_adq:
1350                nvme_ctrl["params"].update({"priority": "1"})
1351
1352            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1353
1354        return json.dumps(bdev_cfg_section, indent=2)
1355
1356    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1357        filename_section = ""
1358        if len(threads) >= len(subsystems):
1359            threads = range(0, len(subsystems))
1360        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1361        nvme_per_split = int(len(subsystems) / len(threads))
1362        remainder = len(subsystems) % len(threads)
1363        iterator = iter(filenames)
1364        result = []
1365        for i in range(len(threads)):
1366            result.append([])
1367            for _ in range(nvme_per_split):
1368                result[i].append(next(iterator))
1369            if remainder:
1370                result[i].append(next(iterator))
1371                remainder -= 1
1372        for i, r in enumerate(result):
1373            header = "[filename%s]" % i
1374            disks = "\n".join(["filename=%s" % x for x in r])
1375            job_section_qd = round((io_depth * len(r)) / num_jobs)
1376            if job_section_qd == 0:
1377                job_section_qd = 1
1378            iodepth = "iodepth=%s" % job_section_qd
1379            filename_section = "\n".join([filename_section, header, disks, iodepth])
1380
1381        return filename_section
1382
1383
1384if __name__ == "__main__":
1385    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1386    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1387
1388    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1389    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1390                        help='Configuration file.')
1391    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1392                        help='Results directory.')
1393    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1394                        help='CSV results filename.')
1395
1396    args = parser.parse_args()
1397
1398    print("Using config file: %s" % args.config)
1399    with open(args.config, "r") as config:
1400        data = json.load(config)
1401
1402    initiators = []
1403    fio_cases = []
1404
1405    general_config = data["general"]
1406    target_config = data["target"]
1407    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1408
1409    for k, v in data.items():
1410        if "target" in k:
1411            v.update({"results_dir": args.results})
1412            if data[k]["mode"] == "spdk":
1413                target_obj = SPDKTarget(k, data["general"], v)
1414            elif data[k]["mode"] == "kernel":
1415                target_obj = KernelTarget(k, data["general"], v)
1416                pass
1417        elif "initiator" in k:
1418            if data[k]["mode"] == "spdk":
1419                init_obj = SPDKInitiator(k, data["general"], v)
1420            elif data[k]["mode"] == "kernel":
1421                init_obj = KernelInitiator(k, data["general"], v)
1422            initiators.append(init_obj)
1423        elif "fio" in k:
1424            fio_workloads = itertools.product(data[k]["bs"],
1425                                              data[k]["qd"],
1426                                              data[k]["rw"])
1427
1428            fio_run_time = data[k]["run_time"]
1429            fio_ramp_time = data[k]["ramp_time"]
1430            fio_rw_mix_read = data[k]["rwmixread"]
1431            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1432            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1433
1434            fio_rate_iops = 0
1435            if "rate_iops" in data[k]:
1436                fio_rate_iops = data[k]["rate_iops"]
1437        else:
1438            continue
1439
1440    try:
1441        os.mkdir(args.results)
1442    except FileExistsError:
1443        pass
1444
1445    target_obj.tgt_start()
1446
1447    for i in initiators:
1448        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1449        if i.enable_adq:
1450            i.adq_configure_tc()
1451
1452    # Poor mans threading
1453    # Run FIO tests
1454    for block_size, io_depth, rw in fio_workloads:
1455        threads = []
1456        configs = []
1457        for i in initiators:
1458            if i.mode == "kernel":
1459                i.kernel_init_connect()
1460
1461            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1462                                   fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1463            configs.append(cfg)
1464
1465        for i, cfg in zip(initiators, configs):
1466            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1467            threads.append(t)
1468        if target_obj.enable_sar:
1469            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1470            sar_file_name = ".".join([sar_file_name, "txt"])
1471            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1472            threads.append(t)
1473
1474        if target_obj.enable_pcm:
1475            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1476
1477            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1478            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1479            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1480
1481            threads.append(pcm_cpu_t)
1482            threads.append(pcm_mem_t)
1483            threads.append(pcm_pow_t)
1484
1485        if target_obj.enable_bandwidth:
1486            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1487            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1488            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1489            threads.append(t)
1490
1491        if target_obj.enable_dpdk_memory:
1492            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1493            threads.append(t)
1494
1495        for t in threads:
1496            t.start()
1497        for t in threads:
1498            t.join()
1499
1500        for i in initiators:
1501            if i.mode == "kernel":
1502                i.kernel_init_disconnect()
1503            i.copy_result_files(args.results)
1504
1505    target_obj.restore_governor()
1506    target_obj.restore_tuned()
1507    target_obj.restore_services()
1508    target_obj.restore_sysctl()
1509    for i in initiators:
1510        i.restore_governor()
1511        i.restore_tuned()
1512        i.restore_services()
1513        i.restore_sysctl()
1514    target_obj.parse_results(args.results, args.csv_filename)
1515