xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 628c230de4780617ec0432bd2427aa91ca837ba6)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import zipfile
8import threading
9import subprocess
10import itertools
11import configparser
12import time
13import uuid
14from collections import OrderedDict
15
16import paramiko
17import pandas as pd
18
19import rpc
20import rpc.client
21from common import *
22
23
24class Server:
25    def __init__(self, name, general_config, server_config):
26        self.name = name
27        self.username = general_config["username"]
28        self.password = general_config["password"]
29        self.transport = general_config["transport"].lower()
30        self.nic_ips = server_config["nic_ips"]
31        self.mode = server_config["mode"]
32
33        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
34        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
35            self.irq_scripts_dir = server_config["irq_scripts_dir"]
36
37        self.local_nic_info = []
38        self._nics_json_obj = {}
39        self.svc_restore_dict = {}
40        self.sysctl_restore_dict = {}
41        self.tuned_restore_dict = {}
42        self.governor_restore = ""
43        self.tuned_profile = ""
44
45        self.enable_adq = False
46        self.adq_priority = None
47        if "adq_enable" in server_config and server_config["adq_enable"]:
48            self.enable_adq = server_config["adq_enable"]
49            self.adq_priority = 1
50
51        if "tuned_profile" in server_config:
52            self.tuned_profile = server_config["tuned_profile"]
53
54        if not re.match("^[A-Za-z0-9]*$", name):
55            self.log_print("Please use a name which contains only letters or numbers")
56            sys.exit(1)
57
58    def log_print(self, msg):
59        print("[%s] %s" % (self.name, msg), flush=True)
60
61    def get_uncommented_lines(self, lines):
62        return [line for line in lines if line and not line.startswith('#')]
63
64    def get_nic_name_by_ip(self, ip):
65        if not self._nics_json_obj:
66            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
67            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
68        for nic in self._nics_json_obj:
69            for addr in nic["addr_info"]:
70                if ip in addr["local"]:
71                    return nic["ifname"]
72
73    def set_local_nic_info_helper(self):
74        pass
75
76    def set_local_nic_info(self, pci_info):
77        def extract_network_elements(json_obj):
78            nic_list = []
79            if isinstance(json_obj, list):
80                for x in json_obj:
81                    nic_list.extend(extract_network_elements(x))
82            elif isinstance(json_obj, dict):
83                if "children" in json_obj:
84                    nic_list.extend(extract_network_elements(json_obj["children"]))
85                if "class" in json_obj.keys() and "network" in json_obj["class"]:
86                    nic_list.append(json_obj)
87            return nic_list
88
89        self.local_nic_info = extract_network_elements(pci_info)
90
91    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
92        return ""
93
94    def configure_system(self):
95        self.configure_services()
96        self.configure_sysctl()
97        self.configure_tuned()
98        self.configure_cpu_governor()
99        self.configure_irq_affinity()
100
101    def configure_adq(self):
102        if self.mode == "kernel":
103            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
104            return
105        self.adq_load_modules()
106        self.adq_configure_nic()
107
108    def adq_load_modules(self):
109        self.log_print("Modprobing ADQ-related Linux modules...")
110        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
111        for module in adq_module_deps:
112            try:
113                self.exec_cmd(["sudo", "modprobe", module])
114                self.log_print("%s loaded!" % module)
115            except CalledProcessError as e:
116                self.log_print("ERROR: failed to load module %s" % module)
117                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
118
119    def adq_configure_tc(self):
120        self.log_print("Configuring ADQ Traffic classess and filters...")
121
122        if self.mode == "kernel":
123            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
124            return
125
126        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
127        num_queues_tc1 = self.num_cores
128        port_param = "dst_port" if isinstance(self, Target) else "src_port"
129        ports = set([p[0] for p in self.subsystem_info_list])
130        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
131
132        for nic_ip in self.nic_ips:
133            nic_name = self.get_nic_name_by_ip(nic_ip)
134            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
135                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
136                                "queues", "%s@0" % num_queues_tc0,
137                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
138                                "hw", "1", "mode", "channel"]
139            self.log_print(" ".join(tc_qdisc_map_cmd))
140            self.exec_cmd(tc_qdisc_map_cmd)
141
142            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
143            self.log_print(" ".join(tc_qdisc_ingress_cmd))
144            self.exec_cmd(tc_qdisc_ingress_cmd)
145
146            for port in ports:
147                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
148                                 "protocol", "ip", "ingress", "prio", "1", "flower",
149                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
150                                 "skip_sw", "hw_tc", "1"]
151                self.log_print(" ".join(tc_filter_cmd))
152                self.exec_cmd(tc_filter_cmd)
153
154            # Ethtool coalese settings must be applied after configuring traffic classes
155            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
156            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
157
158            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
159            xps_cmd = ["sudo", xps_script_path, nic_name]
160            self.log_print(xps_cmd)
161            self.exec_cmd(xps_cmd)
162
163    def adq_configure_nic(self):
164        self.log_print("Configuring NIC port settings for ADQ testing...")
165
166        # Reload the driver first, to make sure any previous settings are re-set.
167        try:
168            self.exec_cmd(["sudo", "rmmod", "ice"])
169            self.exec_cmd(["sudo", "modprobe", "ice"])
170        except CalledProcessError as e:
171            self.log_print("ERROR: failed to reload ice module!")
172            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
173
174        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
175        for nic in nic_names:
176            self.log_print(nic)
177            try:
178                self.exec_cmd(["sudo", "ethtool", "-K", nic,
179                               "hw-tc-offload", "on"])  # Enable hardware TC offload
180                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
181                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
182                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
183                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
184                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
185            except CalledProcessError as e:
186                self.log_print("ERROR: failed to configure NIC port using ethtool!")
187                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
188                self.log_print("Please update your NIC driver and firmware versions and try again.")
189            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
190            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
191
192    def configure_services(self):
193        self.log_print("Configuring active services...")
194        svc_config = configparser.ConfigParser(strict=False)
195
196        # Below list is valid only for RHEL / Fedora systems and might not
197        # contain valid names for other distributions.
198        svc_target_state = {
199            "firewalld": "inactive",
200            "irqbalance": "inactive",
201            "lldpad.service": "inactive",
202            "lldpad.socket": "inactive"
203        }
204
205        for service in svc_target_state:
206            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
207            out = "\n".join(["[%s]" % service, out])
208            svc_config.read_string(out)
209
210            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
211                continue
212
213            service_state = svc_config[service]["ActiveState"]
214            self.log_print("Current state of %s service is %s" % (service, service_state))
215            self.svc_restore_dict.update({service: service_state})
216            if service_state != "inactive":
217                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
218                self.exec_cmd(["sudo", "systemctl", "stop", service])
219
220    def configure_sysctl(self):
221        self.log_print("Tuning sysctl settings...")
222
223        busy_read = 0
224        if self.enable_adq and self.mode == "spdk":
225            busy_read = 1
226
227        sysctl_opts = {
228            "net.core.busy_poll": 0,
229            "net.core.busy_read": busy_read,
230            "net.core.somaxconn": 4096,
231            "net.core.netdev_max_backlog": 8192,
232            "net.ipv4.tcp_max_syn_backlog": 16384,
233            "net.core.rmem_max": 268435456,
234            "net.core.wmem_max": 268435456,
235            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
236            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
237            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
238            "net.ipv4.route.flush": 1,
239            "vm.overcommit_memory": 1,
240        }
241
242        for opt, value in sysctl_opts.items():
243            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
244            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
245
246    def configure_tuned(self):
247        if not self.tuned_profile:
248            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
249            return
250
251        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
252        service = "tuned"
253        tuned_config = configparser.ConfigParser(strict=False)
254
255        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
256        out = "\n".join(["[%s]" % service, out])
257        tuned_config.read_string(out)
258        tuned_state = tuned_config[service]["ActiveState"]
259        self.svc_restore_dict.update({service: tuned_state})
260
261        if tuned_state != "inactive":
262            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
263            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
264
265            self.tuned_restore_dict = {
266                "profile": profile,
267                "mode": profile_mode
268            }
269
270        self.exec_cmd(["sudo", "systemctl", "start", service])
271        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
272        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
273
274    def configure_cpu_governor(self):
275        self.log_print("Setting CPU governor to performance...")
276
277        # This assumes that there is the same CPU scaling governor on each CPU
278        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
279        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
280
281    def configure_irq_affinity(self):
282        self.log_print("Setting NIC irq affinity for NICs...")
283
284        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
285        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
286        for nic in nic_names:
287            irq_cmd = ["sudo", irq_script_path, nic]
288            self.log_print(irq_cmd)
289            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
290
291    def restore_services(self):
292        self.log_print("Restoring services...")
293        for service, state in self.svc_restore_dict.items():
294            cmd = "stop" if state == "inactive" else "start"
295            self.exec_cmd(["sudo", "systemctl", cmd, service])
296
297    def restore_sysctl(self):
298        self.log_print("Restoring sysctl settings...")
299        for opt, value in self.sysctl_restore_dict.items():
300            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
301
302    def restore_tuned(self):
303        self.log_print("Restoring tuned-adm settings...")
304
305        if not self.tuned_restore_dict:
306            return
307
308        if self.tuned_restore_dict["mode"] == "auto":
309            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
310            self.log_print("Reverted tuned-adm to auto_profile.")
311        else:
312            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
313            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
314
315    def restore_governor(self):
316        self.log_print("Restoring CPU governor setting...")
317        if self.governor_restore:
318            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
319            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
320
321
322class Target(Server):
323    def __init__(self, name, general_config, target_config):
324        super(Target, self).__init__(name, general_config, target_config)
325
326        # Defaults
327        self.enable_sar = False
328        self.sar_delay = 0
329        self.sar_interval = 0
330        self.sar_count = 0
331        self.enable_pcm = False
332        self.pcm_dir = ""
333        self.pcm_delay = 0
334        self.pcm_interval = 0
335        self.pcm_count = 0
336        self.enable_bandwidth = 0
337        self.bandwidth_count = 0
338        self.enable_dpdk_memory = False
339        self.dpdk_wait_time = 0
340        self.enable_zcopy = False
341        self.scheduler_name = "static"
342        self.null_block = 0
343        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
344        self.subsystem_info_list = []
345
346        if "null_block_devices" in target_config:
347            self.null_block = target_config["null_block_devices"]
348        if "sar_settings" in target_config:
349            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
350        if "pcm_settings" in target_config:
351            self.enable_pcm = True
352            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
353        if "enable_bandwidth" in target_config:
354            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
355        if "enable_dpdk_memory" in target_config:
356            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
357        if "scheduler_settings" in target_config:
358            self.scheduler_name = target_config["scheduler_settings"]
359        if "zcopy_settings" in target_config:
360            self.enable_zcopy = target_config["zcopy_settings"]
361
362        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
363        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
364        self.set_local_nic_info(self.set_local_nic_info_helper())
365
366        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
367            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
368
369        self.configure_system()
370        if self.enable_adq:
371            self.configure_adq()
372        self.sys_config()
373
374    def set_local_nic_info_helper(self):
375        return json.loads(self.exec_cmd(["lshw", "-json"]))
376
377    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
378        stderr_opt = None
379        if stderr_redirect:
380            stderr_opt = subprocess.STDOUT
381        if change_dir:
382            old_cwd = os.getcwd()
383            os.chdir(change_dir)
384            self.log_print("Changing directory to %s" % change_dir)
385
386        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
387
388        if change_dir:
389            os.chdir(old_cwd)
390            self.log_print("Changing directory to %s" % old_cwd)
391        return out
392
393    def zip_spdk_sources(self, spdk_dir, dest_file):
394        self.log_print("Zipping SPDK source directory")
395        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
396        for root, directories, files in os.walk(spdk_dir, followlinks=True):
397            for file in files:
398                fh.write(os.path.relpath(os.path.join(root, file)))
399        fh.close()
400        self.log_print("Done zipping")
401
402    def read_json_stats(self, file):
403        with open(file, "r") as json_data:
404            data = json.load(json_data)
405            job_pos = 0  # job_post = 0 because using aggregated results
406
407            # Check if latency is in nano or microseconds to choose correct dict key
408            def get_lat_unit(key_prefix, dict_section):
409                # key prefix - lat, clat or slat.
410                # dict section - portion of json containing latency bucket in question
411                # Return dict key to access the bucket and unit as string
412                for k, v in dict_section.items():
413                    if k.startswith(key_prefix):
414                        return k, k.split("_")[1]
415
416            def get_clat_percentiles(clat_dict_leaf):
417                if "percentile" in clat_dict_leaf:
418                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
419                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
420                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
421                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
422
423                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
424                else:
425                    # Latest fio versions do not provide "percentile" results if no
426                    # measurements were done, so just return zeroes
427                    return [0, 0, 0, 0]
428
429            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
430            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
431            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
432            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
433            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
434            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
435            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
436            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
437                data["jobs"][job_pos]["read"][clat_key])
438
439            if "ns" in lat_unit:
440                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
441            if "ns" in clat_unit:
442                read_p99_lat = read_p99_lat / 1000
443                read_p99_9_lat = read_p99_9_lat / 1000
444                read_p99_99_lat = read_p99_99_lat / 1000
445                read_p99_999_lat = read_p99_999_lat / 1000
446
447            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
448            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
449            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
450            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
451            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
452            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
453            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
454            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
455                data["jobs"][job_pos]["write"][clat_key])
456
457            if "ns" in lat_unit:
458                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
459            if "ns" in clat_unit:
460                write_p99_lat = write_p99_lat / 1000
461                write_p99_9_lat = write_p99_9_lat / 1000
462                write_p99_99_lat = write_p99_99_lat / 1000
463                write_p99_999_lat = write_p99_999_lat / 1000
464
465        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
466                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
467                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
468                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
469
470    def parse_results(self, results_dir, initiator_count=None, run_num=None):
471        files = os.listdir(results_dir)
472        fio_files = filter(lambda x: ".fio" in x, files)
473        json_files = [x for x in files if ".json" in x]
474
475        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
476                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
477                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
478                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
479
480        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
481                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
482
483        header_line = ",".join(["Name", *headers])
484        aggr_header_line = ",".join(["Name", *aggr_headers])
485
486        # Create empty results file
487        csv_file = "nvmf_results.csv"
488        with open(os.path.join(results_dir, csv_file), "w") as fh:
489            fh.write(aggr_header_line + "\n")
490        rows = set()
491
492        for fio_config in fio_files:
493            self.log_print("Getting FIO stats for %s" % fio_config)
494            job_name, _ = os.path.splitext(fio_config)
495
496            # Look in the filename for rwmixread value. Function arguments do
497            # not have that information.
498            # TODO: Improve this function by directly using workload params instead
499            # of regexing through filenames.
500            if "read" in job_name:
501                rw_mixread = 1
502            elif "write" in job_name:
503                rw_mixread = 0
504            else:
505                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
506
507            # If "_CPU" exists in name - ignore it
508            # Initiators for the same job could have diffrent num_cores parameter
509            job_name = re.sub(r"_\d+CPU", "", job_name)
510            job_result_files = [x for x in json_files if job_name in x]
511            self.log_print("Matching result files for current fio config:")
512            for j in job_result_files:
513                self.log_print("\t %s" % j)
514
515            # There may have been more than 1 initiator used in test, need to check that
516            # Result files are created so that string after last "_" separator is server name
517            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
518            inits_avg_results = []
519            for i in inits_names:
520                self.log_print("\tGetting stats for initiator %s" % i)
521                # There may have been more than 1 test run for this job, calculate average results for initiator
522                i_results = [x for x in job_result_files if i in x]
523                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
524
525                separate_stats = []
526                for r in i_results:
527                    stats = self.read_json_stats(os.path.join(results_dir, r))
528                    separate_stats.append(stats)
529                    self.log_print(stats)
530
531                init_results = [sum(x) for x in zip(*separate_stats)]
532                init_results = [x / len(separate_stats) for x in init_results]
533                inits_avg_results.append(init_results)
534
535                self.log_print("\tAverage results for initiator %s" % i)
536                self.log_print(init_results)
537                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
538                    fh.write(header_line + "\n")
539                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
540
541            # Sum results of all initiators running this FIO job.
542            # Latency results are an average of latencies from accros all initiators.
543            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
544            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
545            for key in inits_avg_results:
546                if "lat" in key:
547                    inits_avg_results[key] /= len(inits_names)
548
549            # Aggregate separate read/write values into common labels
550            # Take rw_mixread into consideration for mixed read/write workloads.
551            aggregate_results = OrderedDict()
552            for h in aggr_headers:
553                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
554                if "lat" in h:
555                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
556                else:
557                    _ = read_stat + write_stat
558                aggregate_results[h] = "{0:.3f}".format(_)
559
560            rows.add(",".join([job_name, *aggregate_results.values()]))
561
562        # Save results to file
563        for row in rows:
564            with open(os.path.join(results_dir, csv_file), "a") as fh:
565                fh.write(row + "\n")
566        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
567
568    def measure_sar(self, results_dir, sar_file_name):
569        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
570        time.sleep(self.sar_delay)
571        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
572        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
573            for line in out.split("\n"):
574                if "Average" in line and "CPU" in line:
575                    self.log_print("Summary CPU utilization from SAR:")
576                    self.log_print(line)
577                if "Average" in line and "all" in line:
578                    self.log_print(line)
579            fh.write(out)
580
581    def measure_pcm_memory(self, results_dir, pcm_file_name):
582        time.sleep(self.pcm_delay)
583        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
584        pcm_memory = subprocess.Popen(cmd)
585        time.sleep(self.pcm_count)
586        pcm_memory.terminate()
587
588    def measure_pcm(self, results_dir, pcm_file_name):
589        time.sleep(self.pcm_delay)
590        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
591        subprocess.run(cmd)
592        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
593        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
594        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
595        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
596        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
597
598    def measure_pcm_power(self, results_dir, pcm_power_file_name):
599        time.sleep(self.pcm_delay)
600        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
601        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
602            fh.write(out)
603
604    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
605        self.log_print("INFO: starting network bandwidth measure")
606        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
607                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
608
609    def measure_dpdk_memory(self, results_dir):
610        self.log_print("INFO: waiting to generate DPDK memory usage")
611        time.sleep(self.dpdk_wait_time)
612        self.log_print("INFO: generating DPDK memory usage")
613        rpc.env.env_dpdk_get_mem_stats
614        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
615
616    def sys_config(self):
617        self.log_print("====Kernel release:====")
618        self.log_print(os.uname().release)
619        self.log_print("====Kernel command line:====")
620        with open('/proc/cmdline') as f:
621            cmdline = f.readlines()
622            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
623        self.log_print("====sysctl conf:====")
624        with open('/etc/sysctl.conf') as f:
625            sysctl = f.readlines()
626            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
627        self.log_print("====Cpu power info:====")
628        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
629        self.log_print("====zcopy settings:====")
630        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
631        self.log_print("====Scheduler settings:====")
632        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
633
634
635class Initiator(Server):
636    def __init__(self, name, general_config, initiator_config):
637        super(Initiator, self).__init__(name, general_config, initiator_config)
638
639        # Required fields
640        self.ip = initiator_config["ip"]
641        self.target_nic_ips = initiator_config["target_nic_ips"]
642
643        # Defaults
644        self.cpus_allowed = None
645        self.cpus_allowed_policy = "shared"
646        self.spdk_dir = "/tmp/spdk"
647        self.fio_bin = "/usr/src/fio/fio"
648        self.nvmecli_bin = "nvme"
649        self.cpu_frequency = None
650        self.subsystem_info_list = []
651
652        if "spdk_dir" in initiator_config:
653            self.spdk_dir = initiator_config["spdk_dir"]
654        if "fio_bin" in initiator_config:
655            self.fio_bin = initiator_config["fio_bin"]
656        if "nvmecli_bin" in initiator_config:
657            self.nvmecli_bin = initiator_config["nvmecli_bin"]
658        if "cpus_allowed" in initiator_config:
659            self.cpus_allowed = initiator_config["cpus_allowed"]
660        if "cpus_allowed_policy" in initiator_config:
661            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
662        if "cpu_frequency" in initiator_config:
663            self.cpu_frequency = initiator_config["cpu_frequency"]
664        if os.getenv('SPDK_WORKSPACE'):
665            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
666
667        self.ssh_connection = paramiko.SSHClient()
668        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
669        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
670        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
671        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
672        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
673
674        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
675            self.copy_spdk("/tmp/spdk.zip")
676        self.set_local_nic_info(self.set_local_nic_info_helper())
677        self.set_cpu_frequency()
678        self.configure_system()
679        if self.enable_adq:
680            self.configure_adq()
681        self.sys_config()
682
683    def set_local_nic_info_helper(self):
684        return json.loads(self.exec_cmd(["lshw", "-json"]))
685
686    def __del__(self):
687        self.ssh_connection.close()
688
689    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
690        if change_dir:
691            cmd = ["cd", change_dir, ";", *cmd]
692
693        # In case one of the command elements contains whitespace and is not
694        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
695        # when sending to remote system.
696        for i, c in enumerate(cmd):
697            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
698                cmd[i] = '"%s"' % c
699        cmd = " ".join(cmd)
700
701        # Redirect stderr to stdout thanks using get_pty option if needed
702        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
703        out = stdout.read().decode(encoding="utf-8")
704
705        # Check the return code
706        rc = stdout.channel.recv_exit_status()
707        if rc:
708            raise CalledProcessError(int(rc), cmd, out)
709
710        return out
711
712    def put_file(self, local, remote_dest):
713        ftp = self.ssh_connection.open_sftp()
714        ftp.put(local, remote_dest)
715        ftp.close()
716
717    def get_file(self, remote, local_dest):
718        ftp = self.ssh_connection.open_sftp()
719        ftp.get(remote, local_dest)
720        ftp.close()
721
722    def copy_spdk(self, local_spdk_zip):
723        self.log_print("Copying SPDK sources to initiator %s" % self.name)
724        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
725        self.log_print("Copied sources zip from target")
726        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
727        self.log_print("Sources unpacked")
728
729    def copy_result_files(self, dest_dir):
730        self.log_print("Copying results")
731
732        if not os.path.exists(dest_dir):
733            os.mkdir(dest_dir)
734
735        # Get list of result files from initiator and copy them back to target
736        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
737
738        for file in file_list:
739            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
740                          os.path.join(dest_dir, file))
741        self.log_print("Done copying results")
742
743    def discover_subsystems(self, address_list, subsys_no):
744        num_nvmes = range(0, subsys_no)
745        nvme_discover_output = ""
746        for ip, subsys_no in itertools.product(address_list, num_nvmes):
747            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
748            nvme_discover_cmd = ["sudo",
749                                 "%s" % self.nvmecli_bin,
750                                 "discover", "-t", "%s" % self.transport,
751                                 "-s", "%s" % (4420 + subsys_no),
752                                 "-a", "%s" % ip]
753
754            try:
755                stdout = self.exec_cmd(nvme_discover_cmd)
756                if stdout:
757                    nvme_discover_output = nvme_discover_output + stdout
758            except CalledProcessError:
759                # Do nothing. In case of discovering remote subsystems of kernel target
760                # we expect "nvme discover" to fail a bunch of times because we basically
761                # scan ports.
762                pass
763
764        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
765                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
766                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
767                                nvme_discover_output)  # from nvme discovery output
768        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
769        subsystems = list(set(subsystems))
770        subsystems.sort(key=lambda x: x[1])
771        self.log_print("Found matching subsystems on target side:")
772        for s in subsystems:
773            self.log_print(s)
774        self.subsystem_info_list = subsystems
775
776    def gen_fio_filename_conf(self, *args, **kwargs):
777        # Logic implemented in SPDKInitiator and KernelInitiator classes
778        pass
779
780    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
781        fio_conf_template = """
782[global]
783ioengine={ioengine}
784{spdk_conf}
785thread=1
786group_reporting=1
787direct=1
788percentile_list=50:90:99:99.5:99.9:99.99:99.999
789
790norandommap=1
791rw={rw}
792rwmixread={rwmixread}
793bs={block_size}
794time_based=1
795ramp_time={ramp_time}
796runtime={run_time}
797rate_iops={rate_iops}
798"""
799        if "spdk" in self.mode:
800            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
801            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
802            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
803            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
804        else:
805            ioengine = "libaio"
806            spdk_conf = ""
807            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
808                                 "|", "awk", "'{print $1}'"])
809            subsystems = [x for x in out.split("\n") if "nvme" in x]
810
811        if self.cpus_allowed is not None:
812            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
813            cpus_num = 0
814            cpus = self.cpus_allowed.split(",")
815            for cpu in cpus:
816                if "-" in cpu:
817                    a, b = cpu.split("-")
818                    a = int(a)
819                    b = int(b)
820                    cpus_num += len(range(a, b))
821                else:
822                    cpus_num += 1
823            self.num_cores = cpus_num
824            threads = range(0, self.num_cores)
825        elif hasattr(self, 'num_cores'):
826            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
827            threads = range(0, int(self.num_cores))
828        else:
829            self.num_cores = len(subsystems)
830            threads = range(0, len(subsystems))
831
832        if "spdk" in self.mode:
833            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
834        else:
835            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
836
837        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
838                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
839                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
840        if num_jobs:
841            fio_config = fio_config + "numjobs=%s \n" % num_jobs
842        if self.cpus_allowed is not None:
843            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
844            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
845        fio_config = fio_config + filename_section
846
847        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
848        if hasattr(self, "num_cores"):
849            fio_config_filename += "_%sCPU" % self.num_cores
850        fio_config_filename += ".fio"
851
852        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
853        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
854        self.log_print("Created FIO Config:")
855        self.log_print(fio_config)
856
857        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
858
859    def set_cpu_frequency(self):
860        if self.cpu_frequency is not None:
861            try:
862                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
863                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
864                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
865            except Exception:
866                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
867                sys.exit()
868        else:
869            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
870
871    def run_fio(self, fio_config_file, run_num=None):
872        job_name, _ = os.path.splitext(fio_config_file)
873        self.log_print("Starting FIO run for job: %s" % job_name)
874        self.log_print("Using FIO: %s" % self.fio_bin)
875
876        if run_num:
877            for i in range(1, run_num + 1):
878                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
879                output = self.exec_cmd(["sudo", self.fio_bin,
880                                        fio_config_file, "--output-format=json",
881                                        "--output=%s" % output_filename], True)
882                self.log_print(output)
883        else:
884            output_filename = job_name + "_" + self.name + ".json"
885            output = self.exec_cmd(["sudo", self.fio_bin,
886                                    fio_config_file, "--output-format=json",
887                                    "--output" % output_filename], True)
888            self.log_print(output)
889        self.log_print("FIO run finished. Results in: %s" % output_filename)
890
891    def sys_config(self):
892        self.log_print("====Kernel release:====")
893        self.log_print(self.exec_cmd(["uname", "-r"]))
894        self.log_print("====Kernel command line:====")
895        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
896        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
897        self.log_print("====sysctl conf:====")
898        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
899        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
900        self.log_print("====Cpu power info:====")
901        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
902
903
904class KernelTarget(Target):
905    def __init__(self, name, general_config, target_config):
906        super(KernelTarget, self).__init__(name, general_config, target_config)
907        # Defaults
908        self.nvmet_bin = "nvmetcli"
909
910        if "nvmet_bin" in target_config:
911            self.nvmet_bin = target_config["nvmet_bin"]
912
913    def __del__(self):
914        nvmet_command(self.nvmet_bin, "clear")
915
916    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
917
918        nvmet_cfg = {
919            "ports": [],
920            "hosts": [],
921            "subsystems": [],
922        }
923
924        # Split disks between NIC IP's
925        disks_per_ip = int(len(nvme_list) / len(address_list))
926        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
927
928        subsys_no = 1
929        port_no = 0
930        for ip, chunk in zip(address_list, disk_chunks):
931            for disk in chunk:
932                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
933                nvmet_cfg["subsystems"].append({
934                    "allowed_hosts": [],
935                    "attr": {
936                        "allow_any_host": "1",
937                        "serial": "SPDK00%s" % subsys_no,
938                        "version": "1.3"
939                    },
940                    "namespaces": [
941                        {
942                            "device": {
943                                "path": disk,
944                                "uuid": "%s" % uuid.uuid4()
945                            },
946                            "enable": 1,
947                            "nsid": subsys_no
948                        }
949                    ],
950                    "nqn": nqn
951                })
952
953                nvmet_cfg["ports"].append({
954                    "addr": {
955                        "adrfam": "ipv4",
956                        "traddr": ip,
957                        "trsvcid": "%s" % (4420 + port_no),
958                        "trtype": "%s" % self.transport
959                    },
960                    "portid": subsys_no,
961                    "referrals": [],
962                    "subsystems": [nqn]
963                })
964                subsys_no += 1
965                port_no += 1
966                self.subsystem_info_list.append([port_no, nqn, ip])
967
968        with open("kernel.conf", "w") as fh:
969            fh.write(json.dumps(nvmet_cfg, indent=2))
970        pass
971
972    def tgt_start(self):
973        self.log_print("Configuring kernel NVMeOF Target")
974
975        if self.null_block:
976            print("Configuring with null block device.")
977            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
978            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
979            self.subsys_no = len(null_blk_list)
980        else:
981            print("Configuring with NVMe drives.")
982            nvme_list = get_nvme_devices()
983            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
984            self.subsys_no = len(nvme_list)
985
986        nvmet_command(self.nvmet_bin, "clear")
987        nvmet_command(self.nvmet_bin, "restore kernel.conf")
988
989        if self.enable_adq:
990            self.adq_configure_tc()
991
992        self.log_print("Done configuring kernel NVMeOF Target")
993
994
995class SPDKTarget(Target):
996    def __init__(self, name, general_config, target_config):
997        super(SPDKTarget, self).__init__(name, general_config, target_config)
998
999        # Required fields
1000        self.core_mask = target_config["core_mask"]
1001        self.num_cores = self.get_num_cores(self.core_mask)
1002
1003        # Defaults
1004        self.dif_insert_strip = False
1005        self.null_block_dif_type = 0
1006        self.num_shared_buffers = 4096
1007
1008        if "num_shared_buffers" in target_config:
1009            self.num_shared_buffers = target_config["num_shared_buffers"]
1010        if "null_block_dif_type" in target_config:
1011            self.null_block_dif_type = target_config["null_block_dif_type"]
1012        if "dif_insert_strip" in target_config:
1013            self.dif_insert_strip = target_config["dif_insert_strip"]
1014
1015    def get_num_cores(self, core_mask):
1016        if "0x" in core_mask:
1017            return bin(int(core_mask, 16)).count("1")
1018        else:
1019            num_cores = 0
1020            core_mask = core_mask.replace("[", "")
1021            core_mask = core_mask.replace("]", "")
1022            for i in core_mask.split(","):
1023                if "-" in i:
1024                    x, y = i.split("-")
1025                    num_cores += len(range(int(x), int(y))) + 1
1026                else:
1027                    num_cores += 1
1028            return num_cores
1029
1030    def spdk_tgt_configure(self):
1031        self.log_print("Configuring SPDK NVMeOF target via RPC")
1032        numa_list = get_used_numa_nodes()
1033
1034        # Create RDMA transport layer
1035        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1036                                       num_shared_buffers=self.num_shared_buffers,
1037                                       dif_insert_or_strip=self.dif_insert_strip,
1038                                       sock_priority=self.adq_priority)
1039        self.log_print("SPDK NVMeOF transport layer:")
1040        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1041
1042        if self.null_block:
1043            self.spdk_tgt_add_nullblock(self.null_block)
1044            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1045        else:
1046            self.spdk_tgt_add_nvme_conf()
1047            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1048
1049        if self.enable_adq:
1050            self.adq_configure_tc()
1051        self.log_print("Done configuring SPDK NVMeOF Target")
1052
1053    def spdk_tgt_add_nullblock(self, null_block_count):
1054        md_size = 0
1055        block_size = 4096
1056        if self.null_block_dif_type != 0:
1057            md_size = 128
1058
1059        self.log_print("Adding null block bdevices to config via RPC")
1060        for i in range(null_block_count):
1061            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1062            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1063                                      dif_type=self.null_block_dif_type, md_size=md_size)
1064        self.log_print("SPDK Bdevs configuration:")
1065        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1066
1067    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1068        self.log_print("Adding NVMe bdevs to config via RPC")
1069
1070        bdfs = get_nvme_devices_bdf()
1071        bdfs = [b.replace(":", ".") for b in bdfs]
1072
1073        if req_num_disks:
1074            if req_num_disks > len(bdfs):
1075                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1076                sys.exit(1)
1077            else:
1078                bdfs = bdfs[0:req_num_disks]
1079
1080        for i, bdf in enumerate(bdfs):
1081            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1082
1083        self.log_print("SPDK Bdevs configuration:")
1084        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1085
1086    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1087        self.log_print("Adding subsystems to config")
1088        port = "4420"
1089        if not req_num_disks:
1090            req_num_disks = get_nvme_devices_count()
1091
1092        # Distribute bdevs between provided NICs
1093        num_disks = range(0, req_num_disks)
1094        if len(num_disks) == 1:
1095            disks_per_ip = 1
1096        else:
1097            disks_per_ip = int(len(num_disks) / len(ips))
1098        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1099
1100        # Create subsystems, add bdevs to namespaces, add listeners
1101        for ip, chunk in zip(ips, disk_chunks):
1102            for c in chunk:
1103                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1104                serial = "SPDK00%s" % c
1105                bdev_name = "Nvme%sn1" % c
1106                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1107                                               allow_any_host=True, max_namespaces=8)
1108                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1109
1110                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1111                                                     nqn=nqn,
1112                                                     trtype=self.transport,
1113                                                     traddr=ip,
1114                                                     trsvcid=port,
1115                                                     adrfam="ipv4")
1116
1117                self.subsystem_info_list.append([port, nqn, ip])
1118        self.log_print("SPDK NVMeOF subsystem configuration:")
1119        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1120
1121    def tgt_start(self):
1122        if self.null_block:
1123            self.subsys_no = 1
1124        else:
1125            self.subsys_no = get_nvme_devices_count()
1126        self.log_print("Starting SPDK NVMeOF Target process")
1127        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1128        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1129        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1130
1131        with open(self.pid, "w") as fh:
1132            fh.write(str(proc.pid))
1133        self.nvmf_proc = proc
1134        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1135        self.log_print("Waiting for spdk to initilize...")
1136        while True:
1137            if os.path.exists("/var/tmp/spdk.sock"):
1138                break
1139            time.sleep(1)
1140        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1141
1142        if self.enable_zcopy:
1143            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1144                                           enable_zerocopy_send=True)
1145            self.log_print("Target socket options:")
1146            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1147
1148        if self.enable_adq:
1149            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1150            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1151                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1152            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1153
1154        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1155
1156        rpc.framework_start_init(self.client)
1157        self.spdk_tgt_configure()
1158
1159    def __del__(self):
1160        if hasattr(self, "nvmf_proc"):
1161            try:
1162                self.nvmf_proc.terminate()
1163                self.nvmf_proc.wait()
1164            except Exception as e:
1165                self.log_print(e)
1166                self.nvmf_proc.kill()
1167                self.nvmf_proc.communicate()
1168
1169
1170class KernelInitiator(Initiator):
1171    def __init__(self, name, general_config, initiator_config):
1172        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1173
1174        # Defaults
1175        self.extra_params = ""
1176
1177        if "extra_params" in initiator_config:
1178            self.extra_params = initiator_config["extra_params"]
1179
1180    def __del__(self):
1181        self.ssh_connection.close()
1182
1183    def kernel_init_connect(self, address_list, subsys_no):
1184        self.log_print("Below connection attempts may result in error messages, this is expected!")
1185        for subsystem in self.subsystem_info_list:
1186            self.log_print("Trying to connect %s %s %s" % subsystem)
1187            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1188                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1189            time.sleep(2)
1190
1191    def kernel_init_disconnect(self, address_list, subsys_no):
1192        for subsystem in self.subsystem_info_list:
1193            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1194            time.sleep(1)
1195
1196    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1197        out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
1198                             "|", "awk", "'{print $1}'"])
1199        nvme_list = [x for x in out.split("\n") if "nvme" in x]
1200
1201        filename_section = ""
1202        nvme_per_split = int(len(nvme_list) / len(threads))
1203        remainder = len(nvme_list) % len(threads)
1204        iterator = iter(nvme_list)
1205        result = []
1206        for i in range(len(threads)):
1207            result.append([])
1208            for j in range(nvme_per_split):
1209                result[i].append(next(iterator))
1210                if remainder:
1211                    result[i].append(next(iterator))
1212                    remainder -= 1
1213        for i, r in enumerate(result):
1214            header = "[filename%s]" % i
1215            disks = "\n".join(["filename=%s" % x for x in r])
1216            job_section_qd = round((io_depth * len(r)) / num_jobs)
1217            if job_section_qd == 0:
1218                job_section_qd = 1
1219            iodepth = "iodepth=%s" % job_section_qd
1220            filename_section = "\n".join([filename_section, header, disks, iodepth])
1221
1222        return filename_section
1223
1224
1225class SPDKInitiator(Initiator):
1226    def __init__(self, name, general_config, initiator_config):
1227        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1228
1229        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1230            self.install_spdk(self.spdk_dir)
1231
1232        # Required fields
1233        self.num_cores = initiator_config["num_cores"]
1234
1235    def install_spdk(self, local_spdk_zip):
1236        self.log_print("Using fio binary %s" % self.fio_bin)
1237        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1238        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1239        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1240        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1241        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1242
1243        self.log_print("SPDK built")
1244        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1245
1246    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1247        bdev_cfg_section = {
1248            "subsystems": [
1249                {
1250                    "subsystem": "bdev",
1251                    "config": []
1252                }
1253            ]
1254        }
1255
1256        for i, subsys in enumerate(remote_subsystem_list):
1257            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1258            nvme_ctrl = {
1259                "method": "bdev_nvme_attach_controller",
1260                "params": {
1261                    "name": "Nvme{}".format(i),
1262                    "trtype": self.transport,
1263                    "traddr": sub_addr,
1264                    "trsvcid": sub_port,
1265                    "subnqn": sub_nqn,
1266                    "adrfam": "IPv4"
1267                }
1268            }
1269
1270            if self.enable_adq:
1271                nvme_ctrl["params"].update({"priority": "1"})
1272
1273            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1274
1275        return json.dumps(bdev_cfg_section, indent=2)
1276
1277    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1278        filename_section = ""
1279        if len(threads) >= len(subsystems):
1280            threads = range(0, len(subsystems))
1281        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1282        nvme_per_split = int(len(subsystems) / len(threads))
1283        remainder = len(subsystems) % len(threads)
1284        iterator = iter(filenames)
1285        result = []
1286        for i in range(len(threads)):
1287            result.append([])
1288            for j in range(nvme_per_split):
1289                result[i].append(next(iterator))
1290            if remainder:
1291                result[i].append(next(iterator))
1292                remainder -= 1
1293        for i, r in enumerate(result):
1294            header = "[filename%s]" % i
1295            disks = "\n".join(["filename=%s" % x for x in r])
1296            job_section_qd = round((io_depth * len(r)) / num_jobs)
1297            if job_section_qd == 0:
1298                job_section_qd = 1
1299            iodepth = "iodepth=%s" % job_section_qd
1300            filename_section = "\n".join([filename_section, header, disks, iodepth])
1301
1302        return filename_section
1303
1304
1305if __name__ == "__main__":
1306    spdk_zip_path = "/tmp/spdk.zip"
1307    target_results_dir = "/tmp/results"
1308
1309    if (len(sys.argv) > 1):
1310        config_file_path = sys.argv[1]
1311    else:
1312        script_full_dir = os.path.dirname(os.path.realpath(__file__))
1313        config_file_path = os.path.join(script_full_dir, "config.json")
1314
1315    print("Using config file: %s" % config_file_path)
1316    with open(config_file_path, "r") as config:
1317        data = json.load(config)
1318
1319    initiators = []
1320    fio_cases = []
1321
1322    general_config = data["general"]
1323    target_config = data["target"]
1324    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1325
1326    for k, v in data.items():
1327        if "target" in k:
1328            if data[k]["mode"] == "spdk":
1329                target_obj = SPDKTarget(k, data["general"], v)
1330            elif data[k]["mode"] == "kernel":
1331                target_obj = KernelTarget(k, data["general"], v)
1332                pass
1333        elif "initiator" in k:
1334            if data[k]["mode"] == "spdk":
1335                init_obj = SPDKInitiator(k, data["general"], v)
1336            elif data[k]["mode"] == "kernel":
1337                init_obj = KernelInitiator(k, data["general"], v)
1338            initiators.append(init_obj)
1339        elif "fio" in k:
1340            fio_workloads = itertools.product(data[k]["bs"],
1341                                              data[k]["qd"],
1342                                              data[k]["rw"])
1343
1344            fio_run_time = data[k]["run_time"]
1345            fio_ramp_time = data[k]["ramp_time"]
1346            fio_rw_mix_read = data[k]["rwmixread"]
1347            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1348            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1349
1350            fio_rate_iops = 0
1351            if "rate_iops" in data[k]:
1352                fio_rate_iops = data[k]["rate_iops"]
1353        else:
1354            continue
1355
1356    target_obj.tgt_start()
1357
1358    try:
1359        os.mkdir(target_results_dir)
1360    except FileExistsError:
1361        pass
1362
1363    for i in initiators:
1364        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1365        if i.enable_adq:
1366            i.adq_configure_tc()
1367
1368    # Poor mans threading
1369    # Run FIO tests
1370    for block_size, io_depth, rw in fio_workloads:
1371        threads = []
1372        configs = []
1373        for i in initiators:
1374            if i.mode == "kernel":
1375                i.kernel_init_connect(i.target_nic_ips, target_obj.subsys_no)
1376
1377            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1378                                   fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1379            configs.append(cfg)
1380
1381        for i, cfg in zip(initiators, configs):
1382            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1383            threads.append(t)
1384        if target_obj.enable_sar:
1385            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1386            sar_file_name = ".".join([sar_file_name, "txt"])
1387            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
1388            threads.append(t)
1389
1390        if target_obj.enable_pcm:
1391            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1392
1393            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_fnames[0],))
1394            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_fnames[1],))
1395            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_fnames[2],))
1396
1397            threads.append(pcm_cpu_t)
1398            threads.append(pcm_mem_t)
1399            threads.append(pcm_pow_t)
1400
1401        if target_obj.enable_bandwidth:
1402            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1403            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1404            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(target_results_dir, bandwidth_file_name,))
1405            threads.append(t)
1406
1407        if target_obj.enable_dpdk_memory:
1408            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
1409            threads.append(t)
1410
1411        for t in threads:
1412            t.start()
1413        for t in threads:
1414            t.join()
1415
1416        for i in initiators:
1417            if i.mode == "kernel":
1418                i.kernel_init_disconnect(i.target_nic_ips, target_obj.subsys_no)
1419            i.copy_result_files(target_results_dir)
1420
1421    target_obj.restore_governor()
1422    target_obj.restore_tuned()
1423    target_obj.restore_services()
1424    target_obj.restore_sysctl()
1425    for i in initiators:
1426        i.restore_governor()
1427        i.restore_tuned()
1428        i.restore_services()
1429        i.restore_sysctl()
1430    target_obj.parse_results(target_results_dir)
1431