xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 6e5d6032a09ca918509e7c6f28d6d2e20b8dc832)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20
21import rpc
22import rpc.client
23from common import *
24
25
26class Server:
27    def __init__(self, name, general_config, server_config):
28        self.name = name
29        self.username = general_config["username"]
30        self.password = general_config["password"]
31        self.transport = general_config["transport"].lower()
32        self.nic_ips = server_config["nic_ips"]
33        self.mode = server_config["mode"]
34
35        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
36        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
37            self.irq_scripts_dir = server_config["irq_scripts_dir"]
38
39        self.local_nic_info = []
40        self._nics_json_obj = {}
41        self.svc_restore_dict = {}
42        self.sysctl_restore_dict = {}
43        self.tuned_restore_dict = {}
44        self.governor_restore = ""
45        self.tuned_profile = ""
46
47        self.enable_adq = False
48        self.adq_priority = None
49        if "adq_enable" in server_config and server_config["adq_enable"]:
50            self.enable_adq = server_config["adq_enable"]
51            self.adq_priority = 1
52
53        if "tuned_profile" in server_config:
54            self.tuned_profile = server_config["tuned_profile"]
55
56        if not re.match("^[A-Za-z0-9]*$", name):
57            self.log_print("Please use a name which contains only letters or numbers")
58            sys.exit(1)
59
60    def log_print(self, msg):
61        print("[%s] %s" % (self.name, msg), flush=True)
62
63    def get_uncommented_lines(self, lines):
64        return [line for line in lines if line and not line.startswith('#')]
65
66    def get_nic_name_by_ip(self, ip):
67        if not self._nics_json_obj:
68            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
69            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
70        for nic in self._nics_json_obj:
71            for addr in nic["addr_info"]:
72                if ip in addr["local"]:
73                    return nic["ifname"]
74
75    def set_local_nic_info_helper(self):
76        pass
77
78    def set_local_nic_info(self, pci_info):
79        def extract_network_elements(json_obj):
80            nic_list = []
81            if isinstance(json_obj, list):
82                for x in json_obj:
83                    nic_list.extend(extract_network_elements(x))
84            elif isinstance(json_obj, dict):
85                if "children" in json_obj:
86                    nic_list.extend(extract_network_elements(json_obj["children"]))
87                if "class" in json_obj.keys() and "network" in json_obj["class"]:
88                    nic_list.append(json_obj)
89            return nic_list
90
91        self.local_nic_info = extract_network_elements(pci_info)
92
93    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
94        return ""
95
96    def configure_system(self):
97        self.configure_services()
98        self.configure_sysctl()
99        self.configure_tuned()
100        self.configure_cpu_governor()
101        self.configure_irq_affinity()
102
103    def configure_adq(self):
104        if self.mode == "kernel":
105            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
106            return
107        self.adq_load_modules()
108        self.adq_configure_nic()
109
110    def adq_load_modules(self):
111        self.log_print("Modprobing ADQ-related Linux modules...")
112        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
113        for module in adq_module_deps:
114            try:
115                self.exec_cmd(["sudo", "modprobe", module])
116                self.log_print("%s loaded!" % module)
117            except CalledProcessError as e:
118                self.log_print("ERROR: failed to load module %s" % module)
119                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
120
121    def adq_configure_tc(self):
122        self.log_print("Configuring ADQ Traffic classess and filters...")
123
124        if self.mode == "kernel":
125            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
126            return
127
128        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
129        num_queues_tc1 = self.num_cores
130        port_param = "dst_port" if isinstance(self, Target) else "src_port"
131        ports = set([p[0] for p in self.subsystem_info_list])
132        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
133
134        for nic_ip in self.nic_ips:
135            nic_name = self.get_nic_name_by_ip(nic_ip)
136            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
137                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
138                                "queues", "%s@0" % num_queues_tc0,
139                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
140                                "hw", "1", "mode", "channel"]
141            self.log_print(" ".join(tc_qdisc_map_cmd))
142            self.exec_cmd(tc_qdisc_map_cmd)
143
144            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
145            self.log_print(" ".join(tc_qdisc_ingress_cmd))
146            self.exec_cmd(tc_qdisc_ingress_cmd)
147
148            for port in ports:
149                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
150                                 "protocol", "ip", "ingress", "prio", "1", "flower",
151                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
152                                 "skip_sw", "hw_tc", "1"]
153                self.log_print(" ".join(tc_filter_cmd))
154                self.exec_cmd(tc_filter_cmd)
155
156            # Ethtool coalese settings must be applied after configuring traffic classes
157            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
158            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
159
160            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
161            xps_cmd = ["sudo", xps_script_path, nic_name]
162            self.log_print(xps_cmd)
163            self.exec_cmd(xps_cmd)
164
165    def adq_configure_nic(self):
166        self.log_print("Configuring NIC port settings for ADQ testing...")
167
168        # Reload the driver first, to make sure any previous settings are re-set.
169        try:
170            self.exec_cmd(["sudo", "rmmod", "ice"])
171            self.exec_cmd(["sudo", "modprobe", "ice"])
172        except CalledProcessError as e:
173            self.log_print("ERROR: failed to reload ice module!")
174            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
175
176        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
177        for nic in nic_names:
178            self.log_print(nic)
179            try:
180                self.exec_cmd(["sudo", "ethtool", "-K", nic,
181                               "hw-tc-offload", "on"])  # Enable hardware TC offload
182                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
183                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
184                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
185                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
186                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
187            except CalledProcessError as e:
188                self.log_print("ERROR: failed to configure NIC port using ethtool!")
189                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
190                self.log_print("Please update your NIC driver and firmware versions and try again.")
191            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
192            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
193
194    def configure_services(self):
195        self.log_print("Configuring active services...")
196        svc_config = configparser.ConfigParser(strict=False)
197
198        # Below list is valid only for RHEL / Fedora systems and might not
199        # contain valid names for other distributions.
200        svc_target_state = {
201            "firewalld": "inactive",
202            "irqbalance": "inactive",
203            "lldpad.service": "inactive",
204            "lldpad.socket": "inactive"
205        }
206
207        for service in svc_target_state:
208            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
209            out = "\n".join(["[%s]" % service, out])
210            svc_config.read_string(out)
211
212            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
213                continue
214
215            service_state = svc_config[service]["ActiveState"]
216            self.log_print("Current state of %s service is %s" % (service, service_state))
217            self.svc_restore_dict.update({service: service_state})
218            if service_state != "inactive":
219                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
220                self.exec_cmd(["sudo", "systemctl", "stop", service])
221
222    def configure_sysctl(self):
223        self.log_print("Tuning sysctl settings...")
224
225        busy_read = 0
226        if self.enable_adq and self.mode == "spdk":
227            busy_read = 1
228
229        sysctl_opts = {
230            "net.core.busy_poll": 0,
231            "net.core.busy_read": busy_read,
232            "net.core.somaxconn": 4096,
233            "net.core.netdev_max_backlog": 8192,
234            "net.ipv4.tcp_max_syn_backlog": 16384,
235            "net.core.rmem_max": 268435456,
236            "net.core.wmem_max": 268435456,
237            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
238            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
239            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
240            "net.ipv4.route.flush": 1,
241            "vm.overcommit_memory": 1,
242        }
243
244        for opt, value in sysctl_opts.items():
245            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
246            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
247
248    def configure_tuned(self):
249        if not self.tuned_profile:
250            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
251            return
252
253        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
254        service = "tuned"
255        tuned_config = configparser.ConfigParser(strict=False)
256
257        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
258        out = "\n".join(["[%s]" % service, out])
259        tuned_config.read_string(out)
260        tuned_state = tuned_config[service]["ActiveState"]
261        self.svc_restore_dict.update({service: tuned_state})
262
263        if tuned_state != "inactive":
264            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
265            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
266
267            self.tuned_restore_dict = {
268                "profile": profile,
269                "mode": profile_mode
270            }
271
272        self.exec_cmd(["sudo", "systemctl", "start", service])
273        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
274        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
275
276    def configure_cpu_governor(self):
277        self.log_print("Setting CPU governor to performance...")
278
279        # This assumes that there is the same CPU scaling governor on each CPU
280        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
281        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
282
283    def configure_irq_affinity(self):
284        self.log_print("Setting NIC irq affinity for NICs...")
285
286        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
287        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
288        for nic in nic_names:
289            irq_cmd = ["sudo", irq_script_path, nic]
290            self.log_print(irq_cmd)
291            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
292
293    def restore_services(self):
294        self.log_print("Restoring services...")
295        for service, state in self.svc_restore_dict.items():
296            cmd = "stop" if state == "inactive" else "start"
297            self.exec_cmd(["sudo", "systemctl", cmd, service])
298
299    def restore_sysctl(self):
300        self.log_print("Restoring sysctl settings...")
301        for opt, value in self.sysctl_restore_dict.items():
302            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
303
304    def restore_tuned(self):
305        self.log_print("Restoring tuned-adm settings...")
306
307        if not self.tuned_restore_dict:
308            return
309
310        if self.tuned_restore_dict["mode"] == "auto":
311            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
312            self.log_print("Reverted tuned-adm to auto_profile.")
313        else:
314            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
315            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
316
317    def restore_governor(self):
318        self.log_print("Restoring CPU governor setting...")
319        if self.governor_restore:
320            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
321            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
322
323
324class Target(Server):
325    def __init__(self, name, general_config, target_config):
326        super(Target, self).__init__(name, general_config, target_config)
327
328        # Defaults
329        self.enable_sar = False
330        self.sar_delay = 0
331        self.sar_interval = 0
332        self.sar_count = 0
333        self.enable_pcm = False
334        self.pcm_dir = ""
335        self.pcm_delay = 0
336        self.pcm_interval = 0
337        self.pcm_count = 0
338        self.enable_bandwidth = 0
339        self.bandwidth_count = 0
340        self.enable_dpdk_memory = False
341        self.dpdk_wait_time = 0
342        self.enable_zcopy = False
343        self.scheduler_name = "static"
344        self.null_block = 0
345        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
346        self.subsystem_info_list = []
347
348        if "null_block_devices" in target_config:
349            self.null_block = target_config["null_block_devices"]
350        if "sar_settings" in target_config:
351            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
352        if "pcm_settings" in target_config:
353            self.enable_pcm = True
354            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
355        if "enable_bandwidth" in target_config:
356            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
357        if "enable_dpdk_memory" in target_config:
358            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
359        if "scheduler_settings" in target_config:
360            self.scheduler_name = target_config["scheduler_settings"]
361        if "zcopy_settings" in target_config:
362            self.enable_zcopy = target_config["zcopy_settings"]
363        if "results_dir" in target_config:
364            self.results_dir = target_config["results_dir"]
365
366        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
367        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
368        self.set_local_nic_info(self.set_local_nic_info_helper())
369
370        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
371            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
372
373        self.configure_system()
374        if self.enable_adq:
375            self.configure_adq()
376        self.sys_config()
377
378    def set_local_nic_info_helper(self):
379        return json.loads(self.exec_cmd(["lshw", "-json"]))
380
381    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
382        stderr_opt = None
383        if stderr_redirect:
384            stderr_opt = subprocess.STDOUT
385        if change_dir:
386            old_cwd = os.getcwd()
387            os.chdir(change_dir)
388            self.log_print("Changing directory to %s" % change_dir)
389
390        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
391
392        if change_dir:
393            os.chdir(old_cwd)
394            self.log_print("Changing directory to %s" % old_cwd)
395        return out
396
397    def zip_spdk_sources(self, spdk_dir, dest_file):
398        self.log_print("Zipping SPDK source directory")
399        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
400        for root, directories, files in os.walk(spdk_dir, followlinks=True):
401            for file in files:
402                fh.write(os.path.relpath(os.path.join(root, file)))
403        fh.close()
404        self.log_print("Done zipping")
405
406    def read_json_stats(self, file):
407        with open(file, "r") as json_data:
408            data = json.load(json_data)
409            job_pos = 0  # job_post = 0 because using aggregated results
410
411            # Check if latency is in nano or microseconds to choose correct dict key
412            def get_lat_unit(key_prefix, dict_section):
413                # key prefix - lat, clat or slat.
414                # dict section - portion of json containing latency bucket in question
415                # Return dict key to access the bucket and unit as string
416                for k, _ in dict_section.items():
417                    if k.startswith(key_prefix):
418                        return k, k.split("_")[1]
419
420            def get_clat_percentiles(clat_dict_leaf):
421                if "percentile" in clat_dict_leaf:
422                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
423                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
424                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
425                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
426
427                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
428                else:
429                    # Latest fio versions do not provide "percentile" results if no
430                    # measurements were done, so just return zeroes
431                    return [0, 0, 0, 0]
432
433            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
434            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
435            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
436            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
437            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
438            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
439            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
440            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
441                data["jobs"][job_pos]["read"][clat_key])
442
443            if "ns" in lat_unit:
444                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
445            if "ns" in clat_unit:
446                read_p99_lat = read_p99_lat / 1000
447                read_p99_9_lat = read_p99_9_lat / 1000
448                read_p99_99_lat = read_p99_99_lat / 1000
449                read_p99_999_lat = read_p99_999_lat / 1000
450
451            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
452            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
453            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
454            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
455            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
456            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
457            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
458            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
459                data["jobs"][job_pos]["write"][clat_key])
460
461            if "ns" in lat_unit:
462                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
463            if "ns" in clat_unit:
464                write_p99_lat = write_p99_lat / 1000
465                write_p99_9_lat = write_p99_9_lat / 1000
466                write_p99_99_lat = write_p99_99_lat / 1000
467                write_p99_999_lat = write_p99_999_lat / 1000
468
469        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
470                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
471                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
472                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
473
474    def parse_results(self, results_dir, csv_file):
475        files = os.listdir(results_dir)
476        fio_files = filter(lambda x: ".fio" in x, files)
477        json_files = [x for x in files if ".json" in x]
478
479        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
480                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
481                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
482                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
483
484        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
485                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
486
487        header_line = ",".join(["Name", *headers])
488        aggr_header_line = ",".join(["Name", *aggr_headers])
489
490        # Create empty results file
491        with open(os.path.join(results_dir, csv_file), "w") as fh:
492            fh.write(aggr_header_line + "\n")
493        rows = set()
494
495        for fio_config in fio_files:
496            self.log_print("Getting FIO stats for %s" % fio_config)
497            job_name, _ = os.path.splitext(fio_config)
498
499            # Look in the filename for rwmixread value. Function arguments do
500            # not have that information.
501            # TODO: Improve this function by directly using workload params instead
502            # of regexing through filenames.
503            if "read" in job_name:
504                rw_mixread = 1
505            elif "write" in job_name:
506                rw_mixread = 0
507            else:
508                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
509
510            # If "_CPU" exists in name - ignore it
511            # Initiators for the same job could have diffrent num_cores parameter
512            job_name = re.sub(r"_\d+CPU", "", job_name)
513            job_result_files = [x for x in json_files if job_name in x]
514            self.log_print("Matching result files for current fio config:")
515            for j in job_result_files:
516                self.log_print("\t %s" % j)
517
518            # There may have been more than 1 initiator used in test, need to check that
519            # Result files are created so that string after last "_" separator is server name
520            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
521            inits_avg_results = []
522            for i in inits_names:
523                self.log_print("\tGetting stats for initiator %s" % i)
524                # There may have been more than 1 test run for this job, calculate average results for initiator
525                i_results = [x for x in job_result_files if i in x]
526                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
527
528                separate_stats = []
529                for r in i_results:
530                    try:
531                        stats = self.read_json_stats(os.path.join(results_dir, r))
532                        separate_stats.append(stats)
533                        self.log_print(stats)
534                    except JSONDecodeError as e:
535                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!")
536
537                init_results = [sum(x) for x in zip(*separate_stats)]
538                init_results = [x / len(separate_stats) for x in init_results]
539                inits_avg_results.append(init_results)
540
541                self.log_print("\tAverage results for initiator %s" % i)
542                self.log_print(init_results)
543                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
544                    fh.write(header_line + "\n")
545                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
546
547            # Sum results of all initiators running this FIO job.
548            # Latency results are an average of latencies from accros all initiators.
549            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
550            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
551            for key in inits_avg_results:
552                if "lat" in key:
553                    inits_avg_results[key] /= len(inits_names)
554
555            # Aggregate separate read/write values into common labels
556            # Take rw_mixread into consideration for mixed read/write workloads.
557            aggregate_results = OrderedDict()
558            for h in aggr_headers:
559                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
560                if "lat" in h:
561                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
562                else:
563                    _ = read_stat + write_stat
564                aggregate_results[h] = "{0:.3f}".format(_)
565
566            rows.add(",".join([job_name, *aggregate_results.values()]))
567
568        # Save results to file
569        for row in rows:
570            with open(os.path.join(results_dir, csv_file), "a") as fh:
571                fh.write(row + "\n")
572        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
573
574    def measure_sar(self, results_dir, sar_file_name):
575        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
576        time.sleep(self.sar_delay)
577        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
578        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
579            for line in out.split("\n"):
580                if "Average" in line and "CPU" in line:
581                    self.log_print("Summary CPU utilization from SAR:")
582                    self.log_print(line)
583                if "Average" in line and "all" in line:
584                    self.log_print(line)
585            fh.write(out)
586
587    def measure_pcm_memory(self, results_dir, pcm_file_name):
588        time.sleep(self.pcm_delay)
589        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
590        pcm_memory = subprocess.Popen(cmd)
591        time.sleep(self.pcm_count)
592        pcm_memory.terminate()
593
594    def measure_pcm(self, results_dir, pcm_file_name):
595        time.sleep(self.pcm_delay)
596        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
597        subprocess.run(cmd)
598        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
599        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
600        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
601        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
602        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
603
604    def measure_pcm_power(self, results_dir, pcm_power_file_name):
605        time.sleep(self.pcm_delay)
606        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
607        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
608            fh.write(out)
609
610    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
611        self.log_print("INFO: starting network bandwidth measure")
612        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
613                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
614
615    def measure_dpdk_memory(self, results_dir):
616        self.log_print("INFO: waiting to generate DPDK memory usage")
617        time.sleep(self.dpdk_wait_time)
618        self.log_print("INFO: generating DPDK memory usage")
619        rpc.env.env_dpdk_get_mem_stats
620        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
621
622    def sys_config(self):
623        self.log_print("====Kernel release:====")
624        self.log_print(os.uname().release)
625        self.log_print("====Kernel command line:====")
626        with open('/proc/cmdline') as f:
627            cmdline = f.readlines()
628            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
629        self.log_print("====sysctl conf:====")
630        with open('/etc/sysctl.conf') as f:
631            sysctl = f.readlines()
632            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
633        self.log_print("====Cpu power info:====")
634        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
635        self.log_print("====zcopy settings:====")
636        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
637        self.log_print("====Scheduler settings:====")
638        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
639
640
641class Initiator(Server):
642    def __init__(self, name, general_config, initiator_config):
643        super(Initiator, self).__init__(name, general_config, initiator_config)
644
645        # Required fields
646        self.ip = initiator_config["ip"]
647        self.target_nic_ips = initiator_config["target_nic_ips"]
648
649        # Defaults
650        self.cpus_allowed = None
651        self.cpus_allowed_policy = "shared"
652        self.spdk_dir = "/tmp/spdk"
653        self.fio_bin = "/usr/src/fio/fio"
654        self.nvmecli_bin = "nvme"
655        self.cpu_frequency = None
656        self.subsystem_info_list = []
657
658        if "spdk_dir" in initiator_config:
659            self.spdk_dir = initiator_config["spdk_dir"]
660        if "fio_bin" in initiator_config:
661            self.fio_bin = initiator_config["fio_bin"]
662        if "nvmecli_bin" in initiator_config:
663            self.nvmecli_bin = initiator_config["nvmecli_bin"]
664        if "cpus_allowed" in initiator_config:
665            self.cpus_allowed = initiator_config["cpus_allowed"]
666        if "cpus_allowed_policy" in initiator_config:
667            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
668        if "cpu_frequency" in initiator_config:
669            self.cpu_frequency = initiator_config["cpu_frequency"]
670        if os.getenv('SPDK_WORKSPACE'):
671            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
672
673        self.ssh_connection = paramiko.SSHClient()
674        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
675        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
676        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
677        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
678        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
679
680        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
681            self.copy_spdk("/tmp/spdk.zip")
682        self.set_local_nic_info(self.set_local_nic_info_helper())
683        self.set_cpu_frequency()
684        self.configure_system()
685        if self.enable_adq:
686            self.configure_adq()
687        self.sys_config()
688
689    def set_local_nic_info_helper(self):
690        return json.loads(self.exec_cmd(["lshw", "-json"]))
691
692    def __del__(self):
693        self.ssh_connection.close()
694
695    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
696        if change_dir:
697            cmd = ["cd", change_dir, ";", *cmd]
698
699        # In case one of the command elements contains whitespace and is not
700        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
701        # when sending to remote system.
702        for i, c in enumerate(cmd):
703            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
704                cmd[i] = '"%s"' % c
705        cmd = " ".join(cmd)
706
707        # Redirect stderr to stdout thanks using get_pty option if needed
708        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
709        out = stdout.read().decode(encoding="utf-8")
710
711        # Check the return code
712        rc = stdout.channel.recv_exit_status()
713        if rc:
714            raise CalledProcessError(int(rc), cmd, out)
715
716        return out
717
718    def put_file(self, local, remote_dest):
719        ftp = self.ssh_connection.open_sftp()
720        ftp.put(local, remote_dest)
721        ftp.close()
722
723    def get_file(self, remote, local_dest):
724        ftp = self.ssh_connection.open_sftp()
725        ftp.get(remote, local_dest)
726        ftp.close()
727
728    def copy_spdk(self, local_spdk_zip):
729        self.log_print("Copying SPDK sources to initiator %s" % self.name)
730        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
731        self.log_print("Copied sources zip from target")
732        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
733        self.log_print("Sources unpacked")
734
735    def copy_result_files(self, dest_dir):
736        self.log_print("Copying results")
737
738        if not os.path.exists(dest_dir):
739            os.mkdir(dest_dir)
740
741        # Get list of result files from initiator and copy them back to target
742        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
743
744        for file in file_list:
745            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
746                          os.path.join(dest_dir, file))
747        self.log_print("Done copying results")
748
749    def discover_subsystems(self, address_list, subsys_no):
750        num_nvmes = range(0, subsys_no)
751        nvme_discover_output = ""
752        for ip, subsys_no in itertools.product(address_list, num_nvmes):
753            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
754            nvme_discover_cmd = ["sudo",
755                                 "%s" % self.nvmecli_bin,
756                                 "discover", "-t", "%s" % self.transport,
757                                 "-s", "%s" % (4420 + subsys_no),
758                                 "-a", "%s" % ip]
759
760            try:
761                stdout = self.exec_cmd(nvme_discover_cmd)
762                if stdout:
763                    nvme_discover_output = nvme_discover_output + stdout
764            except CalledProcessError:
765                # Do nothing. In case of discovering remote subsystems of kernel target
766                # we expect "nvme discover" to fail a bunch of times because we basically
767                # scan ports.
768                pass
769
770        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
771                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
772                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
773                                nvme_discover_output)  # from nvme discovery output
774        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
775        subsystems = list(set(subsystems))
776        subsystems.sort(key=lambda x: x[1])
777        self.log_print("Found matching subsystems on target side:")
778        for s in subsystems:
779            self.log_print(s)
780        self.subsystem_info_list = subsystems
781
782    def gen_fio_filename_conf(self, *args, **kwargs):
783        # Logic implemented in SPDKInitiator and KernelInitiator classes
784        pass
785
786    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
787        fio_conf_template = """
788[global]
789ioengine={ioengine}
790{spdk_conf}
791thread=1
792group_reporting=1
793direct=1
794percentile_list=50:90:99:99.5:99.9:99.99:99.999
795
796norandommap=1
797rw={rw}
798rwmixread={rwmixread}
799bs={block_size}
800time_based=1
801ramp_time={ramp_time}
802runtime={run_time}
803rate_iops={rate_iops}
804"""
805        if "spdk" in self.mode:
806            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
807            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
808            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
809            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
810        else:
811            ioengine = self.ioengine
812            spdk_conf = ""
813            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
814                                 "|", "awk", "'{print $1}'"])
815            subsystems = [x for x in out.split("\n") if "nvme" in x]
816
817        if self.cpus_allowed is not None:
818            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
819            cpus_num = 0
820            cpus = self.cpus_allowed.split(",")
821            for cpu in cpus:
822                if "-" in cpu:
823                    a, b = cpu.split("-")
824                    a = int(a)
825                    b = int(b)
826                    cpus_num += len(range(a, b))
827                else:
828                    cpus_num += 1
829            self.num_cores = cpus_num
830            threads = range(0, self.num_cores)
831        elif hasattr(self, 'num_cores'):
832            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
833            threads = range(0, int(self.num_cores))
834        else:
835            self.num_cores = len(subsystems)
836            threads = range(0, len(subsystems))
837
838        if "spdk" in self.mode:
839            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
840        else:
841            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
842
843        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
844                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
845                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
846
847        # TODO: hipri disabled for now, as it causes fio errors:
848        # io_u error on file /dev/nvme2n1: Operation not supported
849        # See comment in KernelInitiator class, kernel_init_connect() function
850        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
851            fio_config = fio_config + """
852fixedbufs=1
853registerfiles=1
854#hipri=1
855"""
856        if num_jobs:
857            fio_config = fio_config + "numjobs=%s \n" % num_jobs
858        if self.cpus_allowed is not None:
859            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
860            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
861        fio_config = fio_config + filename_section
862
863        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
864        if hasattr(self, "num_cores"):
865            fio_config_filename += "_%sCPU" % self.num_cores
866        fio_config_filename += ".fio"
867
868        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
869        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
870        self.log_print("Created FIO Config:")
871        self.log_print(fio_config)
872
873        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
874
875    def set_cpu_frequency(self):
876        if self.cpu_frequency is not None:
877            try:
878                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
879                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
880                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
881            except Exception:
882                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
883                sys.exit()
884        else:
885            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
886
887    def run_fio(self, fio_config_file, run_num=None):
888        job_name, _ = os.path.splitext(fio_config_file)
889        self.log_print("Starting FIO run for job: %s" % job_name)
890        self.log_print("Using FIO: %s" % self.fio_bin)
891
892        if run_num:
893            for i in range(1, run_num + 1):
894                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
895                try:
896                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
897                                            "--output=%s" % output_filename, "--eta=never"], True)
898                    self.log_print(output)
899                except subprocess.CalledProcessError as e:
900                    self.log_print("ERROR: Fio process failed!")
901                    self.log_print(e.stdout)
902        else:
903            output_filename = job_name + "_" + self.name + ".json"
904            output = self.exec_cmd(["sudo", self.fio_bin,
905                                    fio_config_file, "--output-format=json",
906                                    "--output" % output_filename], True)
907            self.log_print(output)
908        self.log_print("FIO run finished. Results in: %s" % output_filename)
909
910    def sys_config(self):
911        self.log_print("====Kernel release:====")
912        self.log_print(self.exec_cmd(["uname", "-r"]))
913        self.log_print("====Kernel command line:====")
914        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
915        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
916        self.log_print("====sysctl conf:====")
917        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
918        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
919        self.log_print("====Cpu power info:====")
920        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
921
922
923class KernelTarget(Target):
924    def __init__(self, name, general_config, target_config):
925        super(KernelTarget, self).__init__(name, general_config, target_config)
926        # Defaults
927        self.nvmet_bin = "nvmetcli"
928
929        if "nvmet_bin" in target_config:
930            self.nvmet_bin = target_config["nvmet_bin"]
931
932    def __del__(self):
933        nvmet_command(self.nvmet_bin, "clear")
934
935    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
936
937        nvmet_cfg = {
938            "ports": [],
939            "hosts": [],
940            "subsystems": [],
941        }
942
943        # Split disks between NIC IP's
944        disks_per_ip = int(len(nvme_list) / len(address_list))
945        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
946
947        subsys_no = 1
948        port_no = 0
949        for ip, chunk in zip(address_list, disk_chunks):
950            for disk in chunk:
951                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
952                nvmet_cfg["subsystems"].append({
953                    "allowed_hosts": [],
954                    "attr": {
955                        "allow_any_host": "1",
956                        "serial": "SPDK00%s" % subsys_no,
957                        "version": "1.3"
958                    },
959                    "namespaces": [
960                        {
961                            "device": {
962                                "path": disk,
963                                "uuid": "%s" % uuid.uuid4()
964                            },
965                            "enable": 1,
966                            "nsid": subsys_no
967                        }
968                    ],
969                    "nqn": nqn
970                })
971
972                nvmet_cfg["ports"].append({
973                    "addr": {
974                        "adrfam": "ipv4",
975                        "traddr": ip,
976                        "trsvcid": "%s" % (4420 + port_no),
977                        "trtype": "%s" % self.transport
978                    },
979                    "portid": subsys_no,
980                    "referrals": [],
981                    "subsystems": [nqn]
982                })
983                subsys_no += 1
984                port_no += 1
985                self.subsystem_info_list.append([port_no, nqn, ip])
986
987        with open("kernel.conf", "w") as fh:
988            fh.write(json.dumps(nvmet_cfg, indent=2))
989        pass
990
991    def tgt_start(self):
992        self.log_print("Configuring kernel NVMeOF Target")
993
994        if self.null_block:
995            print("Configuring with null block device.")
996            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
997            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
998            self.subsys_no = len(null_blk_list)
999        else:
1000            print("Configuring with NVMe drives.")
1001            nvme_list = get_nvme_devices()
1002            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1003            self.subsys_no = len(nvme_list)
1004
1005        nvmet_command(self.nvmet_bin, "clear")
1006        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1007
1008        if self.enable_adq:
1009            self.adq_configure_tc()
1010
1011        self.log_print("Done configuring kernel NVMeOF Target")
1012
1013
1014class SPDKTarget(Target):
1015    def __init__(self, name, general_config, target_config):
1016        super(SPDKTarget, self).__init__(name, general_config, target_config)
1017
1018        # Required fields
1019        self.core_mask = target_config["core_mask"]
1020        self.num_cores = self.get_num_cores(self.core_mask)
1021
1022        # Defaults
1023        self.dif_insert_strip = False
1024        self.null_block_dif_type = 0
1025        self.num_shared_buffers = 4096
1026        self.bpf_proc = None
1027        self.bpf_scripts = []
1028
1029        if "num_shared_buffers" in target_config:
1030            self.num_shared_buffers = target_config["num_shared_buffers"]
1031        if "null_block_dif_type" in target_config:
1032            self.null_block_dif_type = target_config["null_block_dif_type"]
1033        if "dif_insert_strip" in target_config:
1034            self.dif_insert_strip = target_config["dif_insert_strip"]
1035        if "bpf_scripts" in target_config:
1036            self.bpf_scripts = target_config["bpf_scripts"]
1037
1038    def get_num_cores(self, core_mask):
1039        if "0x" in core_mask:
1040            return bin(int(core_mask, 16)).count("1")
1041        else:
1042            num_cores = 0
1043            core_mask = core_mask.replace("[", "")
1044            core_mask = core_mask.replace("]", "")
1045            for i in core_mask.split(","):
1046                if "-" in i:
1047                    x, y = i.split("-")
1048                    num_cores += len(range(int(x), int(y))) + 1
1049                else:
1050                    num_cores += 1
1051            return num_cores
1052
1053    def spdk_tgt_configure(self):
1054        self.log_print("Configuring SPDK NVMeOF target via RPC")
1055
1056        # Create RDMA transport layer
1057        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1058                                       num_shared_buffers=self.num_shared_buffers,
1059                                       dif_insert_or_strip=self.dif_insert_strip,
1060                                       sock_priority=self.adq_priority)
1061        self.log_print("SPDK NVMeOF transport layer:")
1062        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1063
1064        if self.enable_adq:
1065            self.adq_configure_tc()
1066            self.log_print("Done configuring SPDK NVMeOF Target")
1067
1068        if self.null_block:
1069            self.spdk_tgt_add_nullblock(self.null_block)
1070            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1071        else:
1072            self.spdk_tgt_add_nvme_conf()
1073            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1074
1075    def spdk_tgt_add_nullblock(self, null_block_count):
1076        md_size = 0
1077        block_size = 4096
1078        if self.null_block_dif_type != 0:
1079            md_size = 128
1080
1081        self.log_print("Adding null block bdevices to config via RPC")
1082        for i in range(null_block_count):
1083            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1084            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1085                                      dif_type=self.null_block_dif_type, md_size=md_size)
1086        self.log_print("SPDK Bdevs configuration:")
1087        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1088
1089    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1090        self.log_print("Adding NVMe bdevs to config via RPC")
1091
1092        bdfs = get_nvme_devices_bdf()
1093        bdfs = [b.replace(":", ".") for b in bdfs]
1094
1095        if req_num_disks:
1096            if req_num_disks > len(bdfs):
1097                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1098                sys.exit(1)
1099            else:
1100                bdfs = bdfs[0:req_num_disks]
1101
1102        for i, bdf in enumerate(bdfs):
1103            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1104
1105        self.log_print("SPDK Bdevs configuration:")
1106        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1107
1108    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1109        self.log_print("Adding subsystems to config")
1110        port = "4420"
1111        if not req_num_disks:
1112            req_num_disks = get_nvme_devices_count()
1113
1114        # Distribute bdevs between provided NICs
1115        num_disks = range(0, req_num_disks)
1116        if len(num_disks) == 1:
1117            disks_per_ip = 1
1118        else:
1119            disks_per_ip = int(len(num_disks) / len(ips))
1120        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1121
1122        # Create subsystems, add bdevs to namespaces, add listeners
1123        for ip, chunk in zip(ips, disk_chunks):
1124            for c in chunk:
1125                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1126                serial = "SPDK00%s" % c
1127                bdev_name = "Nvme%sn1" % c
1128                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1129                                               allow_any_host=True, max_namespaces=8)
1130                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1131
1132                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1133                                                     nqn=nqn,
1134                                                     trtype=self.transport,
1135                                                     traddr=ip,
1136                                                     trsvcid=port,
1137                                                     adrfam="ipv4")
1138
1139                self.subsystem_info_list.append([port, nqn, ip])
1140        self.log_print("SPDK NVMeOF subsystem configuration:")
1141        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1142
1143    def bpf_start(self):
1144        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1145        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1146        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1147        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1148
1149        with open(self.pid, "r") as fh:
1150            nvmf_pid = str(fh.readline())
1151
1152        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1153        self.log_print(cmd)
1154        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1155
1156    def tgt_start(self):
1157        if self.null_block:
1158            self.subsys_no = 1
1159        else:
1160            self.subsys_no = get_nvme_devices_count()
1161        self.log_print("Starting SPDK NVMeOF Target process")
1162        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1163        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1164        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1165
1166        with open(self.pid, "w") as fh:
1167            fh.write(str(proc.pid))
1168        self.nvmf_proc = proc
1169        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1170        self.log_print("Waiting for spdk to initilize...")
1171        while True:
1172            if os.path.exists("/var/tmp/spdk.sock"):
1173                break
1174            time.sleep(1)
1175        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1176
1177        if self.enable_zcopy:
1178            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1179                                           enable_zerocopy_send_server=True)
1180            self.log_print("Target socket options:")
1181            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1182
1183        if self.enable_adq:
1184            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1185            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1186                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1187            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1188
1189        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1190
1191        rpc.framework_start_init(self.client)
1192
1193        if self.bpf_scripts:
1194            self.bpf_start()
1195
1196        self.spdk_tgt_configure()
1197
1198    def __del__(self):
1199        if self.bpf_proc:
1200            self.log_print("Stopping BPF Trace script")
1201            self.bpf_proc.terminate()
1202            self.bpf_proc.wait()
1203
1204        if hasattr(self, "nvmf_proc"):
1205            try:
1206                self.nvmf_proc.terminate()
1207                self.nvmf_proc.wait()
1208            except Exception as e:
1209                self.log_print(e)
1210                self.nvmf_proc.kill()
1211                self.nvmf_proc.communicate()
1212
1213
1214class KernelInitiator(Initiator):
1215    def __init__(self, name, general_config, initiator_config):
1216        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1217
1218        # Defaults
1219        self.extra_params = ""
1220        self.ioengine = "libaio"
1221
1222        if "extra_params" in initiator_config:
1223            self.extra_params = initiator_config["extra_params"]
1224
1225        if "kernel_engine" in initiator_config:
1226            self.ioengine = initiator_config["kernel_engine"]
1227            if "io_uring" in self.ioengine:
1228                self.extra_params = "--nr-poll-queues=8"
1229
1230    def __del__(self):
1231        self.ssh_connection.close()
1232
1233    def get_connected_nvme_list(self):
1234        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1235        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1236                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1237        return nvme_list
1238
1239    def kernel_init_connect(self):
1240        self.log_print("Below connection attempts may result in error messages, this is expected!")
1241        for subsystem in self.subsystem_info_list:
1242            self.log_print("Trying to connect %s %s %s" % subsystem)
1243            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1244                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1245            time.sleep(2)
1246
1247        if "io_uring" in self.ioengine:
1248            self.log_print("Setting block layer settings for io_uring.")
1249
1250            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1251            #       apparently it's not possible for connected subsystems.
1252            #       Results in "error: Invalid argument"
1253            block_sysfs_settings = {
1254                "iostats": "0",
1255                "rq_affinity": "0",
1256                "nomerges": "2"
1257            }
1258
1259            for disk in self.get_connected_nvme_list():
1260                sysfs = os.path.join("/sys/block", disk, "queue")
1261                for k, v in block_sysfs_settings.items():
1262                    sysfs_opt_path = os.path.join(sysfs, k)
1263                    try:
1264                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1265                    except subprocess.CalledProcessError as e:
1266                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1267                    finally:
1268                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1269                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1270
1271    def kernel_init_disconnect(self):
1272        for subsystem in self.subsystem_info_list:
1273            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1274            time.sleep(1)
1275
1276    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1277        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1278
1279        filename_section = ""
1280        nvme_per_split = int(len(nvme_list) / len(threads))
1281        remainder = len(nvme_list) % len(threads)
1282        iterator = iter(nvme_list)
1283        result = []
1284        for i in range(len(threads)):
1285            result.append([])
1286            for _ in range(nvme_per_split):
1287                result[i].append(next(iterator))
1288                if remainder:
1289                    result[i].append(next(iterator))
1290                    remainder -= 1
1291        for i, r in enumerate(result):
1292            header = "[filename%s]" % i
1293            disks = "\n".join(["filename=%s" % x for x in r])
1294            job_section_qd = round((io_depth * len(r)) / num_jobs)
1295            if job_section_qd == 0:
1296                job_section_qd = 1
1297            iodepth = "iodepth=%s" % job_section_qd
1298            filename_section = "\n".join([filename_section, header, disks, iodepth])
1299
1300        return filename_section
1301
1302
1303class SPDKInitiator(Initiator):
1304    def __init__(self, name, general_config, initiator_config):
1305        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1306
1307        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1308            self.install_spdk()
1309
1310        # Required fields
1311        self.num_cores = initiator_config["num_cores"]
1312
1313    def install_spdk(self):
1314        self.log_print("Using fio binary %s" % self.fio_bin)
1315        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1316        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1317        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1318        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1319        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1320
1321        self.log_print("SPDK built")
1322        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1323
1324    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1325        bdev_cfg_section = {
1326            "subsystems": [
1327                {
1328                    "subsystem": "bdev",
1329                    "config": []
1330                }
1331            ]
1332        }
1333
1334        for i, subsys in enumerate(remote_subsystem_list):
1335            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1336            nvme_ctrl = {
1337                "method": "bdev_nvme_attach_controller",
1338                "params": {
1339                    "name": "Nvme{}".format(i),
1340                    "trtype": self.transport,
1341                    "traddr": sub_addr,
1342                    "trsvcid": sub_port,
1343                    "subnqn": sub_nqn,
1344                    "adrfam": "IPv4"
1345                }
1346            }
1347
1348            if self.enable_adq:
1349                nvme_ctrl["params"].update({"priority": "1"})
1350
1351            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1352
1353        return json.dumps(bdev_cfg_section, indent=2)
1354
1355    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1356        filename_section = ""
1357        if len(threads) >= len(subsystems):
1358            threads = range(0, len(subsystems))
1359        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1360        nvme_per_split = int(len(subsystems) / len(threads))
1361        remainder = len(subsystems) % len(threads)
1362        iterator = iter(filenames)
1363        result = []
1364        for i in range(len(threads)):
1365            result.append([])
1366            for _ in range(nvme_per_split):
1367                result[i].append(next(iterator))
1368            if remainder:
1369                result[i].append(next(iterator))
1370                remainder -= 1
1371        for i, r in enumerate(result):
1372            header = "[filename%s]" % i
1373            disks = "\n".join(["filename=%s" % x for x in r])
1374            job_section_qd = round((io_depth * len(r)) / num_jobs)
1375            if job_section_qd == 0:
1376                job_section_qd = 1
1377            iodepth = "iodepth=%s" % job_section_qd
1378            filename_section = "\n".join([filename_section, header, disks, iodepth])
1379
1380        return filename_section
1381
1382
1383if __name__ == "__main__":
1384    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1385    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1386
1387    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1388    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1389                        help='Configuration file.')
1390    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1391                        help='Results directory.')
1392    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1393                        help='CSV results filename.')
1394
1395    args = parser.parse_args()
1396
1397    print("Using config file: %s" % args.config)
1398    with open(args.config, "r") as config:
1399        data = json.load(config)
1400
1401    initiators = []
1402    fio_cases = []
1403
1404    general_config = data["general"]
1405    target_config = data["target"]
1406    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1407
1408    for k, v in data.items():
1409        if "target" in k:
1410            v.update({"results_dir": args.results})
1411            if data[k]["mode"] == "spdk":
1412                target_obj = SPDKTarget(k, data["general"], v)
1413            elif data[k]["mode"] == "kernel":
1414                target_obj = KernelTarget(k, data["general"], v)
1415                pass
1416        elif "initiator" in k:
1417            if data[k]["mode"] == "spdk":
1418                init_obj = SPDKInitiator(k, data["general"], v)
1419            elif data[k]["mode"] == "kernel":
1420                init_obj = KernelInitiator(k, data["general"], v)
1421            initiators.append(init_obj)
1422        elif "fio" in k:
1423            fio_workloads = itertools.product(data[k]["bs"],
1424                                              data[k]["qd"],
1425                                              data[k]["rw"])
1426
1427            fio_run_time = data[k]["run_time"]
1428            fio_ramp_time = data[k]["ramp_time"]
1429            fio_rw_mix_read = data[k]["rwmixread"]
1430            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1431            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1432
1433            fio_rate_iops = 0
1434            if "rate_iops" in data[k]:
1435                fio_rate_iops = data[k]["rate_iops"]
1436        else:
1437            continue
1438
1439    try:
1440        os.mkdir(args.results)
1441    except FileExistsError:
1442        pass
1443
1444    target_obj.tgt_start()
1445
1446    for i in initiators:
1447        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1448        if i.enable_adq:
1449            i.adq_configure_tc()
1450
1451    # Poor mans threading
1452    # Run FIO tests
1453    for block_size, io_depth, rw in fio_workloads:
1454        threads = []
1455        configs = []
1456        for i in initiators:
1457            if i.mode == "kernel":
1458                i.kernel_init_connect()
1459
1460            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1461                                   fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1462            configs.append(cfg)
1463
1464        for i, cfg in zip(initiators, configs):
1465            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1466            threads.append(t)
1467        if target_obj.enable_sar:
1468            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1469            sar_file_name = ".".join([sar_file_name, "txt"])
1470            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1471            threads.append(t)
1472
1473        if target_obj.enable_pcm:
1474            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1475
1476            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1477            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1478            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1479
1480            threads.append(pcm_cpu_t)
1481            threads.append(pcm_mem_t)
1482            threads.append(pcm_pow_t)
1483
1484        if target_obj.enable_bandwidth:
1485            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1486            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1487            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1488            threads.append(t)
1489
1490        if target_obj.enable_dpdk_memory:
1491            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1492            threads.append(t)
1493
1494        for t in threads:
1495            t.start()
1496        for t in threads:
1497            t.join()
1498
1499        for i in initiators:
1500            if i.mode == "kernel":
1501                i.kernel_init_disconnect()
1502            i.copy_result_files(args.results)
1503
1504    target_obj.restore_governor()
1505    target_obj.restore_tuned()
1506    target_obj.restore_services()
1507    target_obj.restore_sysctl()
1508    for i in initiators:
1509        i.restore_governor()
1510        i.restore_tuned()
1511        i.restore_services()
1512        i.restore_sysctl()
1513    target_obj.parse_results(args.results, args.csv_filename)
1514