xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 5a7d428d0fa7ace275e7dc5fe97f6a2ae6ad012d)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import zipfile
8import threading
9import subprocess
10import itertools
11import configparser
12import time
13import uuid
14from collections import OrderedDict
15
16import paramiko
17import pandas as pd
18
19import rpc
20import rpc.client
21from common import *
22
23
24class Server:
25    def __init__(self, name, general_config, server_config):
26        self.name = name
27        self.username = general_config["username"]
28        self.password = general_config["password"]
29        self.transport = general_config["transport"].lower()
30        self.nic_ips = server_config["nic_ips"]
31        self.mode = server_config["mode"]
32
33        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
34        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
35            self.irq_scripts_dir = server_config["irq_scripts_dir"]
36
37        self.local_nic_info = []
38        self._nics_json_obj = {}
39        self.svc_restore_dict = {}
40        self.sysctl_restore_dict = {}
41        self.tuned_restore_dict = {}
42        self.governor_restore = ""
43        self.tuned_profile = ""
44
45        self.enable_adq = False
46        self.adq_priority = None
47        if "adq_enable" in server_config and server_config["adq_enable"]:
48            self.enable_adq = server_config["adq_enable"]
49            self.adq_priority = 1
50
51        if "tuned_profile" in server_config:
52            self.tuned_profile = server_config["tuned_profile"]
53
54        if not re.match("^[A-Za-z0-9]*$", name):
55            self.log_print("Please use a name which contains only letters or numbers")
56            sys.exit(1)
57
58    def log_print(self, msg):
59        print("[%s] %s" % (self.name, msg), flush=True)
60
61    def get_uncommented_lines(self, lines):
62        return [line for line in lines if line and not line.startswith('#')]
63
64    def get_nic_name_by_ip(self, ip):
65        if not self._nics_json_obj:
66            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
67            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
68        for nic in self._nics_json_obj:
69            for addr in nic["addr_info"]:
70                if ip in addr["local"]:
71                    return nic["ifname"]
72
73    def set_local_nic_info_helper(self):
74        pass
75
76    def set_local_nic_info(self, pci_info):
77        def extract_network_elements(json_obj):
78            nic_list = []
79            if isinstance(json_obj, list):
80                for x in json_obj:
81                    nic_list.extend(extract_network_elements(x))
82            elif isinstance(json_obj, dict):
83                if "children" in json_obj:
84                    nic_list.extend(extract_network_elements(json_obj["children"]))
85                if "class" in json_obj.keys() and "network" in json_obj["class"]:
86                    nic_list.append(json_obj)
87            return nic_list
88
89        self.local_nic_info = extract_network_elements(pci_info)
90
91    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
92        return ""
93
94    def configure_system(self):
95        self.configure_services()
96        self.configure_sysctl()
97        self.configure_tuned()
98        self.configure_cpu_governor()
99        self.configure_irq_affinity()
100
101    def configure_adq(self):
102        if self.mode == "kernel":
103            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
104            return
105        self.adq_load_modules()
106        self.adq_configure_nic()
107
108    def adq_load_modules(self):
109        self.log_print("Modprobing ADQ-related Linux modules...")
110        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
111        for module in adq_module_deps:
112            try:
113                self.exec_cmd(["sudo", "modprobe", module])
114                self.log_print("%s loaded!" % module)
115            except CalledProcessError as e:
116                self.log_print("ERROR: failed to load module %s" % module)
117                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
118
119    def adq_configure_tc(self):
120        self.log_print("Configuring ADQ Traffic classess and filters...")
121
122        if self.mode == "kernel":
123            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
124            return
125
126        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
127        num_queues_tc1 = self.num_cores
128        port_param = "dst_port" if isinstance(self, Target) else "src_port"
129        ports = set([p[0] for p in self.subsystem_info_list])
130        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
131
132        for nic_ip in self.nic_ips:
133            nic_name = self.get_nic_name_by_ip(nic_ip)
134            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
135                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
136                                "queues", "%s@0" % num_queues_tc0,
137                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
138                                "hw", "1", "mode", "channel"]
139            self.log_print(" ".join(tc_qdisc_map_cmd))
140            self.exec_cmd(tc_qdisc_map_cmd)
141
142            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
143            self.log_print(" ".join(tc_qdisc_ingress_cmd))
144            self.exec_cmd(tc_qdisc_ingress_cmd)
145
146            for port in ports:
147                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
148                                 "protocol", "ip", "ingress", "prio", "1", "flower",
149                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
150                                 "skip_sw", "hw_tc", "1"]
151                self.log_print(" ".join(tc_filter_cmd))
152                self.exec_cmd(tc_filter_cmd)
153
154            # Ethtool coalese settings must be applied after configuring traffic classes
155            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
156            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
157
158            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
159            xps_cmd = ["sudo", xps_script_path, nic_name]
160            self.log_print(xps_cmd)
161            self.exec_cmd(xps_cmd)
162
163    def adq_configure_nic(self):
164        self.log_print("Configuring NIC port settings for ADQ testing...")
165
166        # Reload the driver first, to make sure any previous settings are re-set.
167        try:
168            self.exec_cmd(["sudo", "rmmod", "ice"])
169            self.exec_cmd(["sudo", "modprobe", "ice"])
170        except CalledProcessError as e:
171            self.log_print("ERROR: failed to reload ice module!")
172            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
173
174        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
175        for nic in nic_names:
176            self.log_print(nic)
177            try:
178                self.exec_cmd(["sudo", "ethtool", "-K", nic,
179                               "hw-tc-offload", "on"])  # Enable hardware TC offload
180                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
181                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
182                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
183                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
184                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
185            except CalledProcessError as e:
186                self.log_print("ERROR: failed to configure NIC port using ethtool!")
187                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
188                self.log_print("Please update your NIC driver and firmware versions and try again.")
189            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
190            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
191
192    def configure_services(self):
193        self.log_print("Configuring active services...")
194        svc_config = configparser.ConfigParser(strict=False)
195
196        # Below list is valid only for RHEL / Fedora systems and might not
197        # contain valid names for other distributions.
198        svc_target_state = {
199            "firewalld": "inactive",
200            "irqbalance": "inactive",
201            "lldpad.service": "inactive",
202            "lldpad.socket": "inactive"
203        }
204
205        for service in svc_target_state:
206            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
207            out = "\n".join(["[%s]" % service, out])
208            svc_config.read_string(out)
209
210            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
211                continue
212
213            service_state = svc_config[service]["ActiveState"]
214            self.log_print("Current state of %s service is %s" % (service, service_state))
215            self.svc_restore_dict.update({service: service_state})
216            if service_state != "inactive":
217                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
218                self.exec_cmd(["sudo", "systemctl", "stop", service])
219
220    def configure_sysctl(self):
221        self.log_print("Tuning sysctl settings...")
222
223        busy_read = 0
224        if self.enable_adq and self.mode == "spdk":
225            busy_read = 1
226
227        sysctl_opts = {
228            "net.core.busy_poll": 0,
229            "net.core.busy_read": busy_read,
230            "net.core.somaxconn": 4096,
231            "net.core.netdev_max_backlog": 8192,
232            "net.ipv4.tcp_max_syn_backlog": 16384,
233            "net.core.rmem_max": 268435456,
234            "net.core.wmem_max": 268435456,
235            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
236            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
237            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
238            "net.ipv4.route.flush": 1,
239            "vm.overcommit_memory": 1,
240        }
241
242        for opt, value in sysctl_opts.items():
243            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
244            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
245
246    def configure_tuned(self):
247        if not self.tuned_profile:
248            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
249            return
250
251        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
252        service = "tuned"
253        tuned_config = configparser.ConfigParser(strict=False)
254
255        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
256        out = "\n".join(["[%s]" % service, out])
257        tuned_config.read_string(out)
258        tuned_state = tuned_config[service]["ActiveState"]
259        self.svc_restore_dict.update({service: tuned_state})
260
261        if tuned_state != "inactive":
262            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
263            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
264
265            self.tuned_restore_dict = {
266                "profile": profile,
267                "mode": profile_mode
268            }
269
270        self.exec_cmd(["sudo", "systemctl", "start", service])
271        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
272        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
273
274    def configure_cpu_governor(self):
275        self.log_print("Setting CPU governor to performance...")
276
277        # This assumes that there is the same CPU scaling governor on each CPU
278        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
279        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
280
281    def configure_irq_affinity(self):
282        self.log_print("Setting NIC irq affinity for NICs...")
283
284        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
285        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
286        for nic in nic_names:
287            irq_cmd = ["sudo", irq_script_path, nic]
288            self.log_print(irq_cmd)
289            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
290
291    def restore_services(self):
292        self.log_print("Restoring services...")
293        for service, state in self.svc_restore_dict.items():
294            cmd = "stop" if state == "inactive" else "start"
295            self.exec_cmd(["sudo", "systemctl", cmd, service])
296
297    def restore_sysctl(self):
298        self.log_print("Restoring sysctl settings...")
299        for opt, value in self.sysctl_restore_dict.items():
300            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
301
302    def restore_tuned(self):
303        self.log_print("Restoring tuned-adm settings...")
304
305        if not self.tuned_restore_dict:
306            return
307
308        if self.tuned_restore_dict["mode"] == "auto":
309            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
310            self.log_print("Reverted tuned-adm to auto_profile.")
311        else:
312            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
313            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
314
315    def restore_governor(self):
316        self.log_print("Restoring CPU governor setting...")
317        if self.governor_restore:
318            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
319            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
320
321
322class Target(Server):
323    def __init__(self, name, general_config, target_config):
324        super(Target, self).__init__(name, general_config, target_config)
325
326        # Defaults
327        self.enable_sar = False
328        self.sar_delay = 0
329        self.sar_interval = 0
330        self.sar_count = 0
331        self.enable_pcm = False
332        self.pcm_dir = ""
333        self.pcm_delay = 0
334        self.pcm_interval = 0
335        self.pcm_count = 0
336        self.enable_bandwidth = 0
337        self.bandwidth_count = 0
338        self.enable_dpdk_memory = False
339        self.dpdk_wait_time = 0
340        self.enable_zcopy = False
341        self.scheduler_name = "static"
342        self.null_block = 0
343        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
344        self.subsystem_info_list = []
345
346        if "null_block_devices" in target_config:
347            self.null_block = target_config["null_block_devices"]
348        if "sar_settings" in target_config:
349            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
350        if "pcm_settings" in target_config:
351            self.enable_pcm = True
352            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
353        if "enable_bandwidth" in target_config:
354            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
355        if "enable_dpdk_memory" in target_config:
356            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
357        if "scheduler_settings" in target_config:
358            self.scheduler_name = target_config["scheduler_settings"]
359        if "zcopy_settings" in target_config:
360            self.enable_zcopy = target_config["zcopy_settings"]
361
362        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
363        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
364        self.set_local_nic_info(self.set_local_nic_info_helper())
365
366        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
367            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
368
369        self.configure_system()
370        if self.enable_adq:
371            self.configure_adq()
372        self.sys_config()
373
374    def set_local_nic_info_helper(self):
375        return json.loads(self.exec_cmd(["lshw", "-json"]))
376
377    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
378        stderr_opt = None
379        if stderr_redirect:
380            stderr_opt = subprocess.STDOUT
381        if change_dir:
382            old_cwd = os.getcwd()
383            os.chdir(change_dir)
384            self.log_print("Changing directory to %s" % change_dir)
385
386        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
387
388        if change_dir:
389            os.chdir(old_cwd)
390            self.log_print("Changing directory to %s" % old_cwd)
391        return out
392
393    def zip_spdk_sources(self, spdk_dir, dest_file):
394        self.log_print("Zipping SPDK source directory")
395        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
396        for root, directories, files in os.walk(spdk_dir, followlinks=True):
397            for file in files:
398                fh.write(os.path.relpath(os.path.join(root, file)))
399        fh.close()
400        self.log_print("Done zipping")
401
402    def read_json_stats(self, file):
403        with open(file, "r") as json_data:
404            data = json.load(json_data)
405            job_pos = 0  # job_post = 0 because using aggregated results
406
407            # Check if latency is in nano or microseconds to choose correct dict key
408            def get_lat_unit(key_prefix, dict_section):
409                # key prefix - lat, clat or slat.
410                # dict section - portion of json containing latency bucket in question
411                # Return dict key to access the bucket and unit as string
412                for k, v in dict_section.items():
413                    if k.startswith(key_prefix):
414                        return k, k.split("_")[1]
415
416            def get_clat_percentiles(clat_dict_leaf):
417                if "percentile" in clat_dict_leaf:
418                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
419                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
420                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
421                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
422
423                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
424                else:
425                    # Latest fio versions do not provide "percentile" results if no
426                    # measurements were done, so just return zeroes
427                    return [0, 0, 0, 0]
428
429            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
430            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
431            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
432            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
433            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
434            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
435            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
436            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
437                data["jobs"][job_pos]["read"][clat_key])
438
439            if "ns" in lat_unit:
440                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
441            if "ns" in clat_unit:
442                read_p99_lat = read_p99_lat / 1000
443                read_p99_9_lat = read_p99_9_lat / 1000
444                read_p99_99_lat = read_p99_99_lat / 1000
445                read_p99_999_lat = read_p99_999_lat / 1000
446
447            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
448            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
449            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
450            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
451            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
452            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
453            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
454            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
455                data["jobs"][job_pos]["write"][clat_key])
456
457            if "ns" in lat_unit:
458                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
459            if "ns" in clat_unit:
460                write_p99_lat = write_p99_lat / 1000
461                write_p99_9_lat = write_p99_9_lat / 1000
462                write_p99_99_lat = write_p99_99_lat / 1000
463                write_p99_999_lat = write_p99_999_lat / 1000
464
465        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
466                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
467                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
468                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
469
470    def parse_results(self, results_dir, initiator_count=None, run_num=None):
471        files = os.listdir(results_dir)
472        fio_files = filter(lambda x: ".fio" in x, files)
473        json_files = [x for x in files if ".json" in x]
474
475        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
476                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
477                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
478                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
479
480        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
481                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
482
483        header_line = ",".join(["Name", *headers])
484        aggr_header_line = ",".join(["Name", *aggr_headers])
485
486        # Create empty results file
487        csv_file = "nvmf_results.csv"
488        with open(os.path.join(results_dir, csv_file), "w") as fh:
489            fh.write(aggr_header_line + "\n")
490        rows = set()
491
492        for fio_config in fio_files:
493            self.log_print("Getting FIO stats for %s" % fio_config)
494            job_name, _ = os.path.splitext(fio_config)
495
496            # Look in the filename for rwmixread value. Function arguments do
497            # not have that information.
498            # TODO: Improve this function by directly using workload params instead
499            # of regexing through filenames.
500            if "read" in job_name:
501                rw_mixread = 1
502            elif "write" in job_name:
503                rw_mixread = 0
504            else:
505                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
506
507            # If "_CPU" exists in name - ignore it
508            # Initiators for the same job could have diffrent num_cores parameter
509            job_name = re.sub(r"_\d+CPU", "", job_name)
510            job_result_files = [x for x in json_files if job_name in x]
511            self.log_print("Matching result files for current fio config:")
512            for j in job_result_files:
513                self.log_print("\t %s" % j)
514
515            # There may have been more than 1 initiator used in test, need to check that
516            # Result files are created so that string after last "_" separator is server name
517            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
518            inits_avg_results = []
519            for i in inits_names:
520                self.log_print("\tGetting stats for initiator %s" % i)
521                # There may have been more than 1 test run for this job, calculate average results for initiator
522                i_results = [x for x in job_result_files if i in x]
523                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
524
525                separate_stats = []
526                for r in i_results:
527                    stats = self.read_json_stats(os.path.join(results_dir, r))
528                    separate_stats.append(stats)
529                    self.log_print(stats)
530
531                init_results = [sum(x) for x in zip(*separate_stats)]
532                init_results = [x / len(separate_stats) for x in init_results]
533                inits_avg_results.append(init_results)
534
535                self.log_print("\tAverage results for initiator %s" % i)
536                self.log_print(init_results)
537                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
538                    fh.write(header_line + "\n")
539                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
540
541            # Sum results of all initiators running this FIO job.
542            # Latency results are an average of latencies from accros all initiators.
543            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
544            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
545            for key in inits_avg_results:
546                if "lat" in key:
547                    inits_avg_results[key] /= len(inits_names)
548
549            # Aggregate separate read/write values into common labels
550            # Take rw_mixread into consideration for mixed read/write workloads.
551            aggregate_results = OrderedDict()
552            for h in aggr_headers:
553                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
554                if "lat" in h:
555                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
556                else:
557                    _ = read_stat + write_stat
558                aggregate_results[h] = "{0:.3f}".format(_)
559
560            rows.add(",".join([job_name, *aggregate_results.values()]))
561
562        # Save results to file
563        for row in rows:
564            with open(os.path.join(results_dir, csv_file), "a") as fh:
565                fh.write(row + "\n")
566        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
567
568    def measure_sar(self, results_dir, sar_file_name):
569        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
570        time.sleep(self.sar_delay)
571        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
572        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
573            for line in out.split("\n"):
574                if "Average" in line and "CPU" in line:
575                    self.log_print("Summary CPU utilization from SAR:")
576                    self.log_print(line)
577                if "Average" in line and "all" in line:
578                    self.log_print(line)
579            fh.write(out)
580
581    def measure_pcm_memory(self, results_dir, pcm_file_name):
582        time.sleep(self.pcm_delay)
583        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
584        pcm_memory = subprocess.Popen(cmd)
585        time.sleep(self.pcm_count)
586        pcm_memory.terminate()
587
588    def measure_pcm(self, results_dir, pcm_file_name):
589        time.sleep(self.pcm_delay)
590        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
591        subprocess.run(cmd)
592        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
593        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
594        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
595        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
596        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
597
598    def measure_pcm_power(self, results_dir, pcm_power_file_name):
599        time.sleep(self.pcm_delay)
600        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
601        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
602            fh.write(out)
603
604    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
605        self.log_print("INFO: starting network bandwidth measure")
606        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
607                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
608
609    def measure_dpdk_memory(self, results_dir):
610        self.log_print("INFO: waiting to generate DPDK memory usage")
611        time.sleep(self.dpdk_wait_time)
612        self.log_print("INFO: generating DPDK memory usage")
613        rpc.env.env_dpdk_get_mem_stats
614        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
615
616    def sys_config(self):
617        self.log_print("====Kernel release:====")
618        self.log_print(os.uname().release)
619        self.log_print("====Kernel command line:====")
620        with open('/proc/cmdline') as f:
621            cmdline = f.readlines()
622            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
623        self.log_print("====sysctl conf:====")
624        with open('/etc/sysctl.conf') as f:
625            sysctl = f.readlines()
626            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
627        self.log_print("====Cpu power info:====")
628        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
629        self.log_print("====zcopy settings:====")
630        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
631        self.log_print("====Scheduler settings:====")
632        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
633
634
635class Initiator(Server):
636    def __init__(self, name, general_config, initiator_config):
637        super(Initiator, self).__init__(name, general_config, initiator_config)
638
639        # Required fields
640        self.ip = initiator_config["ip"]
641        self.target_nic_ips = initiator_config["target_nic_ips"]
642
643        # Defaults
644        self.cpus_allowed = None
645        self.cpus_allowed_policy = "shared"
646        self.spdk_dir = "/tmp/spdk"
647        self.fio_bin = "/usr/src/fio/fio"
648        self.nvmecli_bin = "nvme"
649        self.cpu_frequency = None
650        self.subsystem_info_list = []
651
652        if "spdk_dir" in initiator_config:
653            self.spdk_dir = initiator_config["spdk_dir"]
654        if "fio_bin" in initiator_config:
655            self.fio_bin = initiator_config["fio_bin"]
656        if "nvmecli_bin" in initiator_config:
657            self.nvmecli_bin = initiator_config["nvmecli_bin"]
658        if "cpus_allowed" in initiator_config:
659            self.cpus_allowed = initiator_config["cpus_allowed"]
660        if "cpus_allowed_policy" in initiator_config:
661            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
662        if "cpu_frequency" in initiator_config:
663            self.cpu_frequency = initiator_config["cpu_frequency"]
664        if os.getenv('SPDK_WORKSPACE'):
665            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
666
667        self.ssh_connection = paramiko.SSHClient()
668        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
669        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
670        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
671        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
672        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
673
674        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
675            self.copy_spdk("/tmp/spdk.zip")
676        self.set_local_nic_info(self.set_local_nic_info_helper())
677        self.set_cpu_frequency()
678        self.configure_system()
679        if self.enable_adq:
680            self.configure_adq()
681        self.sys_config()
682
683    def set_local_nic_info_helper(self):
684        return json.loads(self.exec_cmd(["lshw", "-json"]))
685
686    def __del__(self):
687        self.ssh_connection.close()
688
689    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
690        if change_dir:
691            cmd = ["cd", change_dir, ";", *cmd]
692
693        # In case one of the command elements contains whitespace and is not
694        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
695        # when sending to remote system.
696        for i, c in enumerate(cmd):
697            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
698                cmd[i] = '"%s"' % c
699        cmd = " ".join(cmd)
700
701        # Redirect stderr to stdout thanks using get_pty option if needed
702        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
703        out = stdout.read().decode(encoding="utf-8")
704
705        # Check the return code
706        rc = stdout.channel.recv_exit_status()
707        if rc:
708            raise CalledProcessError(int(rc), cmd, out)
709
710        return out
711
712    def put_file(self, local, remote_dest):
713        ftp = self.ssh_connection.open_sftp()
714        ftp.put(local, remote_dest)
715        ftp.close()
716
717    def get_file(self, remote, local_dest):
718        ftp = self.ssh_connection.open_sftp()
719        ftp.get(remote, local_dest)
720        ftp.close()
721
722    def copy_spdk(self, local_spdk_zip):
723        self.log_print("Copying SPDK sources to initiator %s" % self.name)
724        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
725        self.log_print("Copied sources zip from target")
726        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
727        self.log_print("Sources unpacked")
728
729    def copy_result_files(self, dest_dir):
730        self.log_print("Copying results")
731
732        if not os.path.exists(dest_dir):
733            os.mkdir(dest_dir)
734
735        # Get list of result files from initiator and copy them back to target
736        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
737
738        for file in file_list:
739            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
740                          os.path.join(dest_dir, file))
741        self.log_print("Done copying results")
742
743    def discover_subsystems(self, address_list, subsys_no):
744        num_nvmes = range(0, subsys_no)
745        nvme_discover_output = ""
746        for ip, subsys_no in itertools.product(address_list, num_nvmes):
747            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
748            nvme_discover_cmd = ["sudo",
749                                 "%s" % self.nvmecli_bin,
750                                 "discover", "-t", "%s" % self.transport,
751                                 "-s", "%s" % (4420 + subsys_no),
752                                 "-a", "%s" % ip]
753
754            try:
755                stdout = self.exec_cmd(nvme_discover_cmd)
756                if stdout:
757                    nvme_discover_output = nvme_discover_output + stdout
758            except CalledProcessError:
759                # Do nothing. In case of discovering remote subsystems of kernel target
760                # we expect "nvme discover" to fail a bunch of times because we basically
761                # scan ports.
762                pass
763
764        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
765                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
766                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
767                                nvme_discover_output)  # from nvme discovery output
768        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
769        subsystems = list(set(subsystems))
770        subsystems.sort(key=lambda x: x[1])
771        self.log_print("Found matching subsystems on target side:")
772        for s in subsystems:
773            self.log_print(s)
774        self.subsystem_info_list = subsystems
775
776    def gen_fio_filename_conf(self, *args, **kwargs):
777        # Logic implemented in SPDKInitiator and KernelInitiator classes
778        pass
779
780    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
781        fio_conf_template = """
782[global]
783ioengine={ioengine}
784{spdk_conf}
785thread=1
786group_reporting=1
787direct=1
788percentile_list=50:90:99:99.5:99.9:99.99:99.999
789
790norandommap=1
791rw={rw}
792rwmixread={rwmixread}
793bs={block_size}
794time_based=1
795ramp_time={ramp_time}
796runtime={run_time}
797"""
798        if "spdk" in self.mode:
799            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
800            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
801            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
802            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
803        else:
804            ioengine = "libaio"
805            spdk_conf = ""
806            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
807                                 "|", "awk", "'{print $1}'"])
808            subsystems = [x for x in out.split("\n") if "nvme" in x]
809
810        if self.cpus_allowed is not None:
811            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
812            cpus_num = 0
813            cpus = self.cpus_allowed.split(",")
814            for cpu in cpus:
815                if "-" in cpu:
816                    a, b = cpu.split("-")
817                    a = int(a)
818                    b = int(b)
819                    cpus_num += len(range(a, b))
820                else:
821                    cpus_num += 1
822            self.num_cores = cpus_num
823            threads = range(0, self.num_cores)
824        elif hasattr(self, 'num_cores'):
825            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
826            threads = range(0, int(self.num_cores))
827        else:
828            self.num_cores = len(subsystems)
829            threads = range(0, len(subsystems))
830
831        if "spdk" in self.mode:
832            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
833        else:
834            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
835
836        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
837                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
838                                              ramp_time=ramp_time, run_time=run_time)
839        if num_jobs:
840            fio_config = fio_config + "numjobs=%s \n" % num_jobs
841        if self.cpus_allowed is not None:
842            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
843            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
844        fio_config = fio_config + filename_section
845
846        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
847        if hasattr(self, "num_cores"):
848            fio_config_filename += "_%sCPU" % self.num_cores
849        fio_config_filename += ".fio"
850
851        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
852        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
853        self.log_print("Created FIO Config:")
854        self.log_print(fio_config)
855
856        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
857
858    def set_cpu_frequency(self):
859        if self.cpu_frequency is not None:
860            try:
861                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
862                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
863                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
864            except Exception:
865                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
866                sys.exit()
867        else:
868            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
869
870    def run_fio(self, fio_config_file, run_num=None):
871        job_name, _ = os.path.splitext(fio_config_file)
872        self.log_print("Starting FIO run for job: %s" % job_name)
873        self.log_print("Using FIO: %s" % self.fio_bin)
874
875        if run_num:
876            for i in range(1, run_num + 1):
877                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
878                output = self.exec_cmd(["sudo", self.fio_bin,
879                                        fio_config_file, "--output-format=json",
880                                        "--output=%s" % output_filename], True)
881                self.log_print(output)
882        else:
883            output_filename = job_name + "_" + self.name + ".json"
884            output = self.exec_cmd(["sudo", self.fio_bin,
885                                    fio_config_file, "--output-format=json",
886                                    "--output" % output_filename], True)
887            self.log_print(output)
888        self.log_print("FIO run finished. Results in: %s" % output_filename)
889
890    def sys_config(self):
891        self.log_print("====Kernel release:====")
892        self.log_print(self.exec_cmd(["uname", "-r"]))
893        self.log_print("====Kernel command line:====")
894        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
895        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
896        self.log_print("====sysctl conf:====")
897        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
898        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
899        self.log_print("====Cpu power info:====")
900        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
901
902
903class KernelTarget(Target):
904    def __init__(self, name, general_config, target_config):
905        super(KernelTarget, self).__init__(name, general_config, target_config)
906        # Defaults
907        self.nvmet_bin = "nvmetcli"
908
909        if "nvmet_bin" in target_config:
910            self.nvmet_bin = target_config["nvmet_bin"]
911
912    def __del__(self):
913        nvmet_command(self.nvmet_bin, "clear")
914
915    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
916
917        nvmet_cfg = {
918            "ports": [],
919            "hosts": [],
920            "subsystems": [],
921        }
922
923        # Split disks between NIC IP's
924        disks_per_ip = int(len(nvme_list) / len(address_list))
925        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
926
927        subsys_no = 1
928        port_no = 0
929        for ip, chunk in zip(address_list, disk_chunks):
930            for disk in chunk:
931                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
932                nvmet_cfg["subsystems"].append({
933                    "allowed_hosts": [],
934                    "attr": {
935                        "allow_any_host": "1",
936                        "serial": "SPDK00%s" % subsys_no,
937                        "version": "1.3"
938                    },
939                    "namespaces": [
940                        {
941                            "device": {
942                                "path": disk,
943                                "uuid": "%s" % uuid.uuid4()
944                            },
945                            "enable": 1,
946                            "nsid": subsys_no
947                        }
948                    ],
949                    "nqn": nqn
950                })
951
952                nvmet_cfg["ports"].append({
953                    "addr": {
954                        "adrfam": "ipv4",
955                        "traddr": ip,
956                        "trsvcid": "%s" % (4420 + port_no),
957                        "trtype": "%s" % self.transport
958                    },
959                    "portid": subsys_no,
960                    "referrals": [],
961                    "subsystems": [nqn]
962                })
963                subsys_no += 1
964                port_no += 1
965                self.subsystem_info_list.append([port_no, nqn, ip])
966
967        with open("kernel.conf", "w") as fh:
968            fh.write(json.dumps(nvmet_cfg, indent=2))
969        pass
970
971    def tgt_start(self):
972        self.log_print("Configuring kernel NVMeOF Target")
973
974        if self.null_block:
975            print("Configuring with null block device.")
976            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
977            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
978            self.subsys_no = len(null_blk_list)
979        else:
980            print("Configuring with NVMe drives.")
981            nvme_list = get_nvme_devices()
982            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
983            self.subsys_no = len(nvme_list)
984
985        nvmet_command(self.nvmet_bin, "clear")
986        nvmet_command(self.nvmet_bin, "restore kernel.conf")
987
988        if self.enable_adq:
989            self.adq_configure_tc()
990
991        self.log_print("Done configuring kernel NVMeOF Target")
992
993
994class SPDKTarget(Target):
995    def __init__(self, name, general_config, target_config):
996        super(SPDKTarget, self).__init__(name, general_config, target_config)
997
998        # Required fields
999        self.core_mask = target_config["core_mask"]
1000        self.num_cores = self.get_num_cores(self.core_mask)
1001
1002        # Defaults
1003        self.dif_insert_strip = False
1004        self.null_block_dif_type = 0
1005        self.num_shared_buffers = 4096
1006
1007        if "num_shared_buffers" in target_config:
1008            self.num_shared_buffers = target_config["num_shared_buffers"]
1009        if "null_block_dif_type" in target_config:
1010            self.null_block_dif_type = target_config["null_block_dif_type"]
1011        if "dif_insert_strip" in target_config:
1012            self.dif_insert_strip = target_config["dif_insert_strip"]
1013
1014    def get_num_cores(self, core_mask):
1015        if "0x" in core_mask:
1016            return bin(int(core_mask, 16)).count("1")
1017        else:
1018            num_cores = 0
1019            core_mask = core_mask.replace("[", "")
1020            core_mask = core_mask.replace("]", "")
1021            for i in core_mask.split(","):
1022                if "-" in i:
1023                    x, y = i.split("-")
1024                    num_cores += len(range(int(x), int(y))) + 1
1025                else:
1026                    num_cores += 1
1027            return num_cores
1028
1029    def spdk_tgt_configure(self):
1030        self.log_print("Configuring SPDK NVMeOF target via RPC")
1031        numa_list = get_used_numa_nodes()
1032
1033        # Create RDMA transport layer
1034        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1035                                       num_shared_buffers=self.num_shared_buffers,
1036                                       dif_insert_or_strip=self.dif_insert_strip,
1037                                       sock_priority=self.adq_priority)
1038        self.log_print("SPDK NVMeOF transport layer:")
1039        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1040
1041        if self.null_block:
1042            self.spdk_tgt_add_nullblock(self.null_block)
1043            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1044        else:
1045            self.spdk_tgt_add_nvme_conf()
1046            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1047
1048        if self.enable_adq:
1049            self.adq_configure_tc()
1050        self.log_print("Done configuring SPDK NVMeOF Target")
1051
1052    def spdk_tgt_add_nullblock(self, null_block_count):
1053        md_size = 0
1054        block_size = 4096
1055        if self.null_block_dif_type != 0:
1056            md_size = 128
1057
1058        self.log_print("Adding null block bdevices to config via RPC")
1059        for i in range(null_block_count):
1060            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1061            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1062                                      dif_type=self.null_block_dif_type, md_size=md_size)
1063        self.log_print("SPDK Bdevs configuration:")
1064        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1065
1066    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1067        self.log_print("Adding NVMe bdevs to config via RPC")
1068
1069        bdfs = get_nvme_devices_bdf()
1070        bdfs = [b.replace(":", ".") for b in bdfs]
1071
1072        if req_num_disks:
1073            if req_num_disks > len(bdfs):
1074                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1075                sys.exit(1)
1076            else:
1077                bdfs = bdfs[0:req_num_disks]
1078
1079        for i, bdf in enumerate(bdfs):
1080            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1081
1082        self.log_print("SPDK Bdevs configuration:")
1083        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1084
1085    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1086        self.log_print("Adding subsystems to config")
1087        port = "4420"
1088        if not req_num_disks:
1089            req_num_disks = get_nvme_devices_count()
1090
1091        # Distribute bdevs between provided NICs
1092        num_disks = range(0, req_num_disks)
1093        if len(num_disks) == 1:
1094            disks_per_ip = 1
1095        else:
1096            disks_per_ip = int(len(num_disks) / len(ips))
1097        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1098
1099        # Create subsystems, add bdevs to namespaces, add listeners
1100        for ip, chunk in zip(ips, disk_chunks):
1101            for c in chunk:
1102                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1103                serial = "SPDK00%s" % c
1104                bdev_name = "Nvme%sn1" % c
1105                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1106                                               allow_any_host=True, max_namespaces=8)
1107                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1108
1109                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
1110                                                     trtype=self.transport,
1111                                                     traddr=ip,
1112                                                     trsvcid=port,
1113                                                     adrfam="ipv4")
1114
1115                self.subsystem_info_list.append([port, nqn, ip])
1116        self.log_print("SPDK NVMeOF subsystem configuration:")
1117        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1118
1119    def tgt_start(self):
1120        if self.null_block:
1121            self.subsys_no = 1
1122        else:
1123            self.subsys_no = get_nvme_devices_count()
1124        self.log_print("Starting SPDK NVMeOF Target process")
1125        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1126        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1127        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1128
1129        with open(self.pid, "w") as fh:
1130            fh.write(str(proc.pid))
1131        self.nvmf_proc = proc
1132        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1133        self.log_print("Waiting for spdk to initilize...")
1134        while True:
1135            if os.path.exists("/var/tmp/spdk.sock"):
1136                break
1137            time.sleep(1)
1138        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1139
1140        if self.enable_zcopy:
1141            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1142                                           enable_zerocopy_send=True)
1143            self.log_print("Target socket options:")
1144            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1145
1146        if self.enable_adq:
1147            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1148            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1149                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1150            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1151
1152        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1153
1154        rpc.framework_start_init(self.client)
1155        self.spdk_tgt_configure()
1156
1157    def __del__(self):
1158        if hasattr(self, "nvmf_proc"):
1159            try:
1160                self.nvmf_proc.terminate()
1161                self.nvmf_proc.wait()
1162            except Exception as e:
1163                self.log_print(e)
1164                self.nvmf_proc.kill()
1165                self.nvmf_proc.communicate()
1166
1167
1168class KernelInitiator(Initiator):
1169    def __init__(self, name, general_config, initiator_config):
1170        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1171
1172        # Defaults
1173        self.extra_params = ""
1174
1175        if "extra_params" in initiator_config:
1176            self.extra_params = initiator_config["extra_params"]
1177
1178    def __del__(self):
1179        self.ssh_connection.close()
1180
1181    def kernel_init_connect(self, address_list, subsys_no):
1182        self.log_print("Below connection attempts may result in error messages, this is expected!")
1183        for subsystem in self.subsystem_info_list:
1184            self.log_print("Trying to connect %s %s %s" % subsystem)
1185            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1186                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1187            time.sleep(2)
1188
1189    def kernel_init_disconnect(self, address_list, subsys_no):
1190        for subsystem in self.subsystem_info_list:
1191            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1192            time.sleep(1)
1193
1194    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1195        out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
1196                             "|", "awk", "'{print $1}'"])
1197        nvme_list = [x for x in out.split("\n") if "nvme" in x]
1198
1199        filename_section = ""
1200        nvme_per_split = int(len(nvme_list) / len(threads))
1201        remainder = len(nvme_list) % len(threads)
1202        iterator = iter(nvme_list)
1203        result = []
1204        for i in range(len(threads)):
1205            result.append([])
1206            for j in range(nvme_per_split):
1207                result[i].append(next(iterator))
1208                if remainder:
1209                    result[i].append(next(iterator))
1210                    remainder -= 1
1211        for i, r in enumerate(result):
1212            header = "[filename%s]" % i
1213            disks = "\n".join(["filename=%s" % x for x in r])
1214            job_section_qd = round((io_depth * len(r)) / num_jobs)
1215            if job_section_qd == 0:
1216                job_section_qd = 1
1217            iodepth = "iodepth=%s" % job_section_qd
1218            filename_section = "\n".join([filename_section, header, disks, iodepth])
1219
1220        return filename_section
1221
1222
1223class SPDKInitiator(Initiator):
1224    def __init__(self, name, general_config, initiator_config):
1225        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1226
1227        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1228            self.install_spdk(self.spdk_dir)
1229
1230        # Required fields
1231        self.num_cores = initiator_config["num_cores"]
1232
1233    def install_spdk(self, local_spdk_zip):
1234        self.log_print("Using fio binary %s" % self.fio_bin)
1235        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1236        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1237        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1238        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1239        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1240
1241        self.log_print("SPDK built")
1242        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1243
1244    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1245        bdev_cfg_section = {
1246            "subsystems": [
1247                {
1248                    "subsystem": "bdev",
1249                    "config": []
1250                }
1251            ]
1252        }
1253
1254        for i, subsys in enumerate(remote_subsystem_list):
1255            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1256            nvme_ctrl = {
1257                "method": "bdev_nvme_attach_controller",
1258                "params": {
1259                    "name": "Nvme{}".format(i),
1260                    "trtype": self.transport,
1261                    "traddr": sub_addr,
1262                    "trsvcid": sub_port,
1263                    "subnqn": sub_nqn,
1264                    "adrfam": "IPv4"
1265                }
1266            }
1267
1268            if self.enable_adq:
1269                nvme_ctrl["params"].update({"priority": "1"})
1270
1271            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1272
1273        return json.dumps(bdev_cfg_section, indent=2)
1274
1275    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1276        filename_section = ""
1277        if len(threads) >= len(subsystems):
1278            threads = range(0, len(subsystems))
1279        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1280        nvme_per_split = int(len(subsystems) / len(threads))
1281        remainder = len(subsystems) % len(threads)
1282        iterator = iter(filenames)
1283        result = []
1284        for i in range(len(threads)):
1285            result.append([])
1286            for j in range(nvme_per_split):
1287                result[i].append(next(iterator))
1288            if remainder:
1289                result[i].append(next(iterator))
1290                remainder -= 1
1291        for i, r in enumerate(result):
1292            header = "[filename%s]" % i
1293            disks = "\n".join(["filename=%s" % x for x in r])
1294            job_section_qd = round((io_depth * len(r)) / num_jobs)
1295            if job_section_qd == 0:
1296                job_section_qd = 1
1297            iodepth = "iodepth=%s" % job_section_qd
1298            filename_section = "\n".join([filename_section, header, disks, iodepth])
1299
1300        return filename_section
1301
1302
1303if __name__ == "__main__":
1304    spdk_zip_path = "/tmp/spdk.zip"
1305    target_results_dir = "/tmp/results"
1306
1307    if (len(sys.argv) > 1):
1308        config_file_path = sys.argv[1]
1309    else:
1310        script_full_dir = os.path.dirname(os.path.realpath(__file__))
1311        config_file_path = os.path.join(script_full_dir, "config.json")
1312
1313    print("Using config file: %s" % config_file_path)
1314    with open(config_file_path, "r") as config:
1315        data = json.load(config)
1316
1317    initiators = []
1318    fio_cases = []
1319
1320    general_config = data["general"]
1321    target_config = data["target"]
1322    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1323
1324    for k, v in data.items():
1325        if "target" in k:
1326            if data[k]["mode"] == "spdk":
1327                target_obj = SPDKTarget(k, data["general"], v)
1328            elif data[k]["mode"] == "kernel":
1329                target_obj = KernelTarget(k, data["general"], v)
1330                pass
1331        elif "initiator" in k:
1332            if data[k]["mode"] == "spdk":
1333                init_obj = SPDKInitiator(k, data["general"], v)
1334            elif data[k]["mode"] == "kernel":
1335                init_obj = KernelInitiator(k, data["general"], v)
1336            initiators.append(init_obj)
1337        elif "fio" in k:
1338            fio_workloads = itertools.product(data[k]["bs"],
1339                                              data[k]["qd"],
1340                                              data[k]["rw"])
1341
1342            fio_run_time = data[k]["run_time"]
1343            fio_ramp_time = data[k]["ramp_time"]
1344            fio_rw_mix_read = data[k]["rwmixread"]
1345            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1346            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1347        else:
1348            continue
1349
1350    target_obj.tgt_start()
1351
1352    try:
1353        os.mkdir(target_results_dir)
1354    except FileExistsError:
1355        pass
1356
1357    for i in initiators:
1358        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1359        if i.enable_adq:
1360            i.adq_configure_tc()
1361
1362    # Poor mans threading
1363    # Run FIO tests
1364    for block_size, io_depth, rw in fio_workloads:
1365        threads = []
1366        configs = []
1367        for i in initiators:
1368            if i.mode == "kernel":
1369                i.kernel_init_connect(i.target_nic_ips, target_obj.subsys_no)
1370
1371            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1372                                   fio_num_jobs, fio_ramp_time, fio_run_time)
1373            configs.append(cfg)
1374
1375        for i, cfg in zip(initiators, configs):
1376            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1377            threads.append(t)
1378        if target_obj.enable_sar:
1379            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1380            sar_file_name = ".".join([sar_file_name, "txt"])
1381            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
1382            threads.append(t)
1383
1384        if target_obj.enable_pcm:
1385            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1386
1387            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_fnames[0],))
1388            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_fnames[1],))
1389            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_fnames[2],))
1390
1391            threads.append(pcm_cpu_t)
1392            threads.append(pcm_mem_t)
1393            threads.append(pcm_pow_t)
1394
1395        if target_obj.enable_bandwidth:
1396            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1397            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1398            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(target_results_dir, bandwidth_file_name,))
1399            threads.append(t)
1400
1401        if target_obj.enable_dpdk_memory:
1402            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
1403            threads.append(t)
1404
1405        for t in threads:
1406            t.start()
1407        for t in threads:
1408            t.join()
1409
1410        for i in initiators:
1411            if i.mode == "kernel":
1412                i.kernel_init_disconnect(i.target_nic_ips, target_obj.subsys_no)
1413            i.copy_result_files(target_results_dir)
1414
1415    target_obj.restore_governor()
1416    target_obj.restore_tuned()
1417    target_obj.restore_services()
1418    target_obj.restore_sysctl()
1419    for i in initiators:
1420        i.restore_governor()
1421        i.restore_tuned()
1422        i.restore_services()
1423        i.restore_sysctl()
1424    target_obj.parse_results(target_results_dir)
1425