xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 5ee049eeecb4bfa60a4fbc85fe82ec74dc97e07b)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import json
7import zipfile
8import threading
9import subprocess
10import itertools
11import configparser
12import time
13import uuid
14from collections import OrderedDict
15
16import paramiko
17import pandas as pd
18
19import rpc
20import rpc.client
21from common import *
22
23
24class Server:
25    def __init__(self, name, general_config, server_config):
26        self.name = name
27        self.username = general_config["username"]
28        self.password = general_config["password"]
29        self.transport = general_config["transport"].lower()
30        self.nic_ips = server_config["nic_ips"]
31        self.mode = server_config["mode"]
32
33        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
34        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
35            self.irq_scripts_dir = server_config["irq_scripts_dir"]
36
37        self.local_nic_info = []
38        self._nics_json_obj = {}
39        self.svc_restore_dict = {}
40        self.sysctl_restore_dict = {}
41        self.tuned_restore_dict = {}
42        self.governor_restore = ""
43        self.tuned_profile = ""
44
45        self.enable_adq = False
46        self.adq_priority = None
47        if "adq_enable" in server_config and server_config["adq_enable"]:
48            self.enable_adq = server_config["adq_enable"]
49            self.adq_priority = 1
50
51        if "tuned_profile" in server_config:
52            self.tuned_profile = server_config["tuned_profile"]
53
54        if not re.match("^[A-Za-z0-9]*$", name):
55            self.log_print("Please use a name which contains only letters or numbers")
56            sys.exit(1)
57
58    def log_print(self, msg):
59        print("[%s] %s" % (self.name, msg), flush=True)
60
61    def get_uncommented_lines(self, lines):
62        return [line for line in lines if line and not line.startswith('#')]
63
64    def get_nic_name_by_ip(self, ip):
65        if not self._nics_json_obj:
66            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
67            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
68        for nic in self._nics_json_obj:
69            for addr in nic["addr_info"]:
70                if ip in addr["local"]:
71                    return nic["ifname"]
72
73    def set_local_nic_info_helper(self):
74        pass
75
76    def set_local_nic_info(self, pci_info):
77        def extract_network_elements(json_obj):
78            nic_list = []
79            if isinstance(json_obj, list):
80                for x in json_obj:
81                    nic_list.extend(extract_network_elements(x))
82            elif isinstance(json_obj, dict):
83                if "children" in json_obj:
84                    nic_list.extend(extract_network_elements(json_obj["children"]))
85                if "class" in json_obj.keys() and "network" in json_obj["class"]:
86                    nic_list.append(json_obj)
87            return nic_list
88
89        self.local_nic_info = extract_network_elements(pci_info)
90
91    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
92        return ""
93
94    def configure_system(self):
95        self.configure_services()
96        self.configure_sysctl()
97        self.configure_tuned()
98        self.configure_cpu_governor()
99        self.configure_irq_affinity()
100
101    def configure_adq(self):
102        self.adq_load_modules()
103        self.adq_configure_nic()
104
105    def adq_load_modules(self):
106        self.log_print("Modprobing ADQ-related Linux modules...")
107        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
108        for module in adq_module_deps:
109            try:
110                self.exec_cmd(["sudo", "modprobe", module])
111                self.log_print("%s loaded!" % module)
112            except CalledProcessError as e:
113                self.log_print("ERROR: failed to load module %s" % module)
114                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
115
116    def adq_configure_nic(self):
117        self.log_print("Configuring NIC port settings for ADQ testing...")
118
119        # Reload the driver first, to make sure any previous settings are re-set.
120        try:
121            self.exec_cmd(["sudo", "rmmod", "ice"])
122            self.exec_cmd(["sudo", "modprobe", "ice"])
123        except CalledProcessError as e:
124            self.log_print("ERROR: failed to reload ice module!")
125            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
126
127        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
128        for nic in nic_names:
129            self.log_print(nic)
130            try:
131                self.exec_cmd(["sudo", "ethtool", "-K", nic,
132                               "hw-tc-offload", "on"])  # Enable hardware TC offload
133                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
134                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
135                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
136                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
137                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
138            except CalledProcessError as e:
139                self.log_print("ERROR: failed to configure NIC port using ethtool!")
140                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
141                self.log_print("Please update your NIC driver and firmware versions and try again.")
142            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
143            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
144
145    def configure_services(self):
146        self.log_print("Configuring active services...")
147        svc_config = configparser.ConfigParser(strict=False)
148
149        # Below list is valid only for RHEL / Fedora systems and might not
150        # contain valid names for other distributions.
151        svc_target_state = {
152            "firewalld": "inactive",
153            "irqbalance": "inactive",
154            "lldpad.service": "inactive",
155            "lldpad.socket": "inactive"
156        }
157
158        for service in svc_target_state:
159            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
160            out = "\n".join(["[%s]" % service, out])
161            svc_config.read_string(out)
162
163            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
164                continue
165
166            service_state = svc_config[service]["ActiveState"]
167            self.log_print("Current state of %s service is %s" % (service, service_state))
168            self.svc_restore_dict.update({service: service_state})
169            if service_state != "inactive":
170                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
171                self.exec_cmd(["sudo", "systemctl", "stop", service])
172
173    def configure_sysctl(self):
174        self.log_print("Tuning sysctl settings...")
175
176        busy_read = 50000
177        if self.enable_adq and self.mode == "spdk":
178            busy_read = 1
179
180        sysctl_opts = {
181            "net.core.busy_poll": 0,
182            "net.core.busy_read": busy_read,
183            "net.core.somaxconn": 4096,
184            "net.core.netdev_max_backlog": 8192,
185            "net.ipv4.tcp_max_syn_backlog": 16384,
186            "net.core.rmem_max": 268435456,
187            "net.core.wmem_max": 268435456,
188            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
189            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
190            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
191            "net.ipv4.route.flush": 1,
192            "vm.overcommit_memory": 1,
193        }
194
195        for opt, value in sysctl_opts.items():
196            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
197            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
198
199    def configure_tuned(self):
200        if not self.tuned_profile:
201            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
202            return
203
204        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
205        service = "tuned"
206        tuned_config = configparser.ConfigParser(strict=False)
207
208        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
209        out = "\n".join(["[%s]" % service, out])
210        tuned_config.read_string(out)
211        tuned_state = tuned_config[service]["ActiveState"]
212        self.svc_restore_dict.update({service: tuned_state})
213
214        if tuned_state != "inactive":
215            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
216            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
217
218            self.tuned_restore_dict = {
219                "profile": profile,
220                "mode": profile_mode
221            }
222
223        self.exec_cmd(["sudo", "systemctl", "start", service])
224        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
225        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
226
227    def configure_cpu_governor(self):
228        self.log_print("Setting CPU governor to performance...")
229
230        # This assumes that there is the same CPU scaling governor on each CPU
231        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
232        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
233
234    def configure_irq_affinity(self):
235        self.log_print("Setting NIC irq affinity for NICs...")
236
237        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
238        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
239        for nic in nic_names:
240            irq_cmd = ["sudo", irq_script_path, nic]
241            self.log_print(irq_cmd)
242            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
243
244    def restore_services(self):
245        self.log_print("Restoring services...")
246        for service, state in self.svc_restore_dict.items():
247            cmd = "stop" if state == "inactive" else "start"
248            self.exec_cmd(["sudo", "systemctl", cmd, service])
249
250    def restore_sysctl(self):
251        self.log_print("Restoring sysctl settings...")
252        for opt, value in self.sysctl_restore_dict.items():
253            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
254
255    def restore_tuned(self):
256        self.log_print("Restoring tuned-adm settings...")
257
258        if not self.tuned_restore_dict:
259            return
260
261        if self.tuned_restore_dict["mode"] == "auto":
262            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
263            self.log_print("Reverted tuned-adm to auto_profile.")
264        else:
265            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
266            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
267
268    def restore_governor(self):
269        self.log_print("Restoring CPU governor setting...")
270        if self.governor_restore:
271            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
272            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
273
274
275class Target(Server):
276    def __init__(self, name, general_config, target_config):
277        super(Target, self).__init__(name, general_config, target_config)
278
279        # Defaults
280        self.enable_sar = False
281        self.sar_delay = 0
282        self.sar_interval = 0
283        self.sar_count = 0
284        self.enable_pcm = False
285        self.pcm_dir = ""
286        self.pcm_delay = 0
287        self.pcm_interval = 0
288        self.pcm_count = 0
289        self.enable_bandwidth = 0
290        self.bandwidth_count = 0
291        self.enable_dpdk_memory = False
292        self.dpdk_wait_time = 0
293        self.enable_zcopy = False
294        self.scheduler_name = "static"
295        self.null_block = 0
296        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
297        self.subsystem_info_list = []
298
299        if "null_block_devices" in target_config:
300            self.null_block = target_config["null_block_devices"]
301        if "sar_settings" in target_config:
302            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
303        if "pcm_settings" in target_config:
304            self.enable_pcm = True
305            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
306        if "enable_bandwidth" in target_config:
307            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
308        if "enable_dpdk_memory" in target_config:
309            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
310        if "scheduler_settings" in target_config:
311            self.scheduler_name = target_config["scheduler_settings"]
312        if "zcopy_settings" in target_config:
313            self.enable_zcopy = target_config["zcopy_settings"]
314
315        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
316        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
317        self.set_local_nic_info(self.set_local_nic_info_helper())
318        self.configure_system()
319        if self.enable_adq:
320            self.configure_adq()
321        self.sys_config()
322
323    def set_local_nic_info_helper(self):
324        return json.loads(self.exec_cmd(["lshw", "-json"]))
325
326    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
327        stderr_opt = None
328        if stderr_redirect:
329            stderr_opt = subprocess.STDOUT
330        if change_dir:
331            old_cwd = os.getcwd()
332            os.chdir(change_dir)
333            self.log_print("Changing directory to %s" % change_dir)
334
335        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
336
337        if change_dir:
338            os.chdir(old_cwd)
339            self.log_print("Changing directory to %s" % old_cwd)
340        return out
341
342    def zip_spdk_sources(self, spdk_dir, dest_file):
343        self.log_print("Zipping SPDK source directory")
344        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
345        for root, directories, files in os.walk(spdk_dir, followlinks=True):
346            for file in files:
347                fh.write(os.path.relpath(os.path.join(root, file)))
348        fh.close()
349        self.log_print("Done zipping")
350
351    def read_json_stats(self, file):
352        with open(file, "r") as json_data:
353            data = json.load(json_data)
354            job_pos = 0  # job_post = 0 because using aggregated results
355
356            # Check if latency is in nano or microseconds to choose correct dict key
357            def get_lat_unit(key_prefix, dict_section):
358                # key prefix - lat, clat or slat.
359                # dict section - portion of json containing latency bucket in question
360                # Return dict key to access the bucket and unit as string
361                for k, v in dict_section.items():
362                    if k.startswith(key_prefix):
363                        return k, k.split("_")[1]
364
365            def get_clat_percentiles(clat_dict_leaf):
366                if "percentile" in clat_dict_leaf:
367                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
368                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
369                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
370                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
371
372                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
373                else:
374                    # Latest fio versions do not provide "percentile" results if no
375                    # measurements were done, so just return zeroes
376                    return [0, 0, 0, 0]
377
378            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
379            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
380            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
381            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
382            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
383            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
384            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
385            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
386                data["jobs"][job_pos]["read"][clat_key])
387
388            if "ns" in lat_unit:
389                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
390            if "ns" in clat_unit:
391                read_p99_lat = read_p99_lat / 1000
392                read_p99_9_lat = read_p99_9_lat / 1000
393                read_p99_99_lat = read_p99_99_lat / 1000
394                read_p99_999_lat = read_p99_999_lat / 1000
395
396            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
397            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
398            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
399            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
400            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
401            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
402            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
403            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
404                data["jobs"][job_pos]["write"][clat_key])
405
406            if "ns" in lat_unit:
407                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
408            if "ns" in clat_unit:
409                write_p99_lat = write_p99_lat / 1000
410                write_p99_9_lat = write_p99_9_lat / 1000
411                write_p99_99_lat = write_p99_99_lat / 1000
412                write_p99_999_lat = write_p99_999_lat / 1000
413
414        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
415                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
416                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
417                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
418
419    def parse_results(self, results_dir, initiator_count=None, run_num=None):
420        files = os.listdir(results_dir)
421        fio_files = filter(lambda x: ".fio" in x, files)
422        json_files = [x for x in files if ".json" in x]
423
424        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
425                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
426                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
427                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
428
429        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
430                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
431
432        header_line = ",".join(["Name", *headers])
433        aggr_header_line = ",".join(["Name", *aggr_headers])
434
435        # Create empty results file
436        csv_file = "nvmf_results.csv"
437        with open(os.path.join(results_dir, csv_file), "w") as fh:
438            fh.write(aggr_header_line + "\n")
439        rows = set()
440
441        for fio_config in fio_files:
442            self.log_print("Getting FIO stats for %s" % fio_config)
443            job_name, _ = os.path.splitext(fio_config)
444
445            # Look in the filename for rwmixread value. Function arguments do
446            # not have that information.
447            # TODO: Improve this function by directly using workload params instead
448            # of regexing through filenames.
449            if "read" in job_name:
450                rw_mixread = 1
451            elif "write" in job_name:
452                rw_mixread = 0
453            else:
454                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
455
456            # If "_CPU" exists in name - ignore it
457            # Initiators for the same job could have diffrent num_cores parameter
458            job_name = re.sub(r"_\d+CPU", "", job_name)
459            job_result_files = [x for x in json_files if job_name in x]
460            self.log_print("Matching result files for current fio config:")
461            for j in job_result_files:
462                self.log_print("\t %s" % j)
463
464            # There may have been more than 1 initiator used in test, need to check that
465            # Result files are created so that string after last "_" separator is server name
466            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
467            inits_avg_results = []
468            for i in inits_names:
469                self.log_print("\tGetting stats for initiator %s" % i)
470                # There may have been more than 1 test run for this job, calculate average results for initiator
471                i_results = [x for x in job_result_files if i in x]
472                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
473
474                separate_stats = []
475                for r in i_results:
476                    stats = self.read_json_stats(os.path.join(results_dir, r))
477                    separate_stats.append(stats)
478                    self.log_print(stats)
479
480                init_results = [sum(x) for x in zip(*separate_stats)]
481                init_results = [x / len(separate_stats) for x in init_results]
482                inits_avg_results.append(init_results)
483
484                self.log_print("\tAverage results for initiator %s" % i)
485                self.log_print(init_results)
486                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
487                    fh.write(header_line + "\n")
488                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
489
490            # Sum results of all initiators running this FIO job.
491            # Latency results are an average of latencies from accros all initiators.
492            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
493            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
494            for key in inits_avg_results:
495                if "lat" in key:
496                    inits_avg_results[key] /= len(inits_names)
497
498            # Aggregate separate read/write values into common labels
499            # Take rw_mixread into consideration for mixed read/write workloads.
500            aggregate_results = OrderedDict()
501            for h in aggr_headers:
502                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
503                if "lat" in h:
504                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
505                else:
506                    _ = read_stat + write_stat
507                aggregate_results[h] = "{0:.3f}".format(_)
508
509            rows.add(",".join([job_name, *aggregate_results.values()]))
510
511        # Save results to file
512        for row in rows:
513            with open(os.path.join(results_dir, csv_file), "a") as fh:
514                fh.write(row + "\n")
515        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
516
517    def measure_sar(self, results_dir, sar_file_name):
518        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
519        time.sleep(self.sar_delay)
520        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
521        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
522            for line in out.split("\n"):
523                if "Average" in line and "CPU" in line:
524                    self.log_print("Summary CPU utilization from SAR:")
525                    self.log_print(line)
526                if "Average" in line and "all" in line:
527                    self.log_print(line)
528            fh.write(out)
529
530    def measure_pcm_memory(self, results_dir, pcm_file_name):
531        time.sleep(self.pcm_delay)
532        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
533        pcm_memory = subprocess.Popen(cmd)
534        time.sleep(self.pcm_count)
535        pcm_memory.terminate()
536
537    def measure_pcm(self, results_dir, pcm_file_name):
538        time.sleep(self.pcm_delay)
539        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
540        subprocess.run(cmd)
541        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
542        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
543        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
544        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
545        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
546
547    def measure_pcm_power(self, results_dir, pcm_power_file_name):
548        time.sleep(self.pcm_delay)
549        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
550        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
551            fh.write(out)
552
553    def measure_bandwidth(self, results_dir, bandwidth_file_name):
554        self.exec_cmd(["bwm-ng", "-o csv", "-F %s/%s" % (results_dir, bandwidth_file_name),
555                       "-a 1", "-t 1000", "-c %s" % self.bandwidth_count])
556
557    def measure_dpdk_memory(self, results_dir):
558        self.log_print("INFO: waiting to generate DPDK memory usage")
559        time.sleep(self.dpdk_wait_time)
560        self.log_print("INFO: generating DPDK memory usage")
561        rpc.env.env_dpdk_get_mem_stats
562        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
563
564    def sys_config(self):
565        self.log_print("====Kernel release:====")
566        self.log_print(os.uname().release)
567        self.log_print("====Kernel command line:====")
568        with open('/proc/cmdline') as f:
569            cmdline = f.readlines()
570            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
571        self.log_print("====sysctl conf:====")
572        with open('/etc/sysctl.conf') as f:
573            sysctl = f.readlines()
574            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
575        self.log_print("====Cpu power info:====")
576        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
577        self.log_print("====zcopy settings:====")
578        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
579        self.log_print("====Scheduler settings:====")
580        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
581
582
583class Initiator(Server):
584    def __init__(self, name, general_config, initiator_config):
585        super(Initiator, self).__init__(name, general_config, initiator_config)
586
587        # Required fields
588        self.ip = initiator_config["ip"]
589        self.remote_nic_ips = initiator_config["remote_nic_ips"]
590
591        # Defaults
592        self.cpus_allowed = None
593        self.cpus_allowed_policy = "shared"
594        self.spdk_dir = "/tmp/spdk"
595        self.fio_bin = "/usr/src/fio/fio"
596        self.nvmecli_bin = "nvme"
597        self.cpu_frequency = None
598        self.subsystem_info_list = []
599
600        if "spdk_dir" in initiator_config:
601            self.spdk_dir = initiator_config["spdk_dir"]
602        if "fio_bin" in initiator_config:
603            self.fio_bin = initiator_config["fio_bin"]
604        if "nvmecli_bin" in initiator_config:
605            self.nvmecli_bin = initiator_config["nvmecli_bin"]
606        if "cpus_allowed" in initiator_config:
607            self.cpus_allowed = initiator_config["cpus_allowed"]
608        if "cpus_allowed_policy" in initiator_config:
609            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
610        if "cpu_frequency" in initiator_config:
611            self.cpu_frequency = initiator_config["cpu_frequency"]
612        if os.getenv('SPDK_WORKSPACE'):
613            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
614
615        self.ssh_connection = paramiko.SSHClient()
616        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
617        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
618        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
619        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
620        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
621        self.set_local_nic_info(self.set_local_nic_info_helper())
622        self.set_cpu_frequency()
623        self.configure_system()
624        if self.enable_adq:
625            self.configure_adq()
626        self.sys_config()
627
628    def set_local_nic_info_helper(self):
629        return json.loads(self.exec_cmd(["lshw", "-json"]))
630
631    def __del__(self):
632        self.ssh_connection.close()
633
634    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
635        if change_dir:
636            cmd = ["cd", change_dir, ";", *cmd]
637
638        # In case one of the command elements contains whitespace and is not
639        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
640        # when sending to remote system.
641        for i, c in enumerate(cmd):
642            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
643                cmd[i] = '"%s"' % c
644        cmd = " ".join(cmd)
645
646        # Redirect stderr to stdout thanks using get_pty option if needed
647        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
648        out = stdout.read().decode(encoding="utf-8")
649
650        # Check the return code
651        rc = stdout.channel.recv_exit_status()
652        if rc:
653            raise CalledProcessError(int(rc), cmd, out)
654
655        return out
656
657    def put_file(self, local, remote_dest):
658        ftp = self.ssh_connection.open_sftp()
659        ftp.put(local, remote_dest)
660        ftp.close()
661
662    def get_file(self, remote, local_dest):
663        ftp = self.ssh_connection.open_sftp()
664        ftp.get(remote, local_dest)
665        ftp.close()
666
667    def copy_result_files(self, dest_dir):
668        self.log_print("Copying results")
669
670        if not os.path.exists(dest_dir):
671            os.mkdir(dest_dir)
672
673        # Get list of result files from initiator and copy them back to target
674        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
675
676        for file in file_list:
677            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
678                          os.path.join(dest_dir, file))
679        self.log_print("Done copying results")
680
681    def discover_subsystems(self, address_list, subsys_no):
682        num_nvmes = range(0, subsys_no)
683        nvme_discover_output = ""
684        for ip, subsys_no in itertools.product(address_list, num_nvmes):
685            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
686            nvme_discover_cmd = ["sudo",
687                                 "%s" % self.nvmecli_bin,
688                                 "discover", "-t", "%s" % self.transport,
689                                 "-s", "%s" % (4420 + subsys_no),
690                                 "-a", "%s" % ip]
691
692            try:
693                stdout = self.exec_cmd(nvme_discover_cmd)
694                if stdout:
695                    nvme_discover_output = nvme_discover_output + stdout
696            except CalledProcessError:
697                # Do nothing. In case of discovering remote subsystems of kernel target
698                # we expect "nvme discover" to fail a bunch of times because we basically
699                # scan ports.
700                pass
701
702        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
703                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
704                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
705                                nvme_discover_output)  # from nvme discovery output
706        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
707        subsystems = list(set(subsystems))
708        subsystems.sort(key=lambda x: x[1])
709        self.log_print("Found matching subsystems on target side:")
710        for s in subsystems:
711            self.log_print(s)
712        self.subsystem_info_list = subsystems
713
714    def gen_fio_filename_conf(self, *args, **kwargs):
715        # Logic implemented in SPDKInitiator and KernelInitiator classes
716        pass
717
718    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
719        fio_conf_template = """
720[global]
721ioengine={ioengine}
722{spdk_conf}
723thread=1
724group_reporting=1
725direct=1
726percentile_list=50:90:99:99.5:99.9:99.99:99.999
727
728norandommap=1
729rw={rw}
730rwmixread={rwmixread}
731bs={block_size}
732time_based=1
733ramp_time={ramp_time}
734runtime={run_time}
735"""
736        if "spdk" in self.mode:
737            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
738            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
739            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
740            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
741        else:
742            ioengine = "libaio"
743            spdk_conf = ""
744            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
745                                 "|", "awk", "'{print $1}'"])
746            subsystems = [x for x in out.split("\n") if "nvme" in x]
747
748        if self.cpus_allowed is not None:
749            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
750            cpus_num = 0
751            cpus = self.cpus_allowed.split(",")
752            for cpu in cpus:
753                if "-" in cpu:
754                    a, b = cpu.split("-")
755                    a = int(a)
756                    b = int(b)
757                    cpus_num += len(range(a, b))
758                else:
759                    cpus_num += 1
760            threads = range(0, cpus_num)
761        elif hasattr(self, 'num_cores'):
762            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
763            threads = range(0, int(self.num_cores))
764        else:
765            threads = range(0, len(subsystems))
766
767        if "spdk" in self.mode:
768            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
769        else:
770            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
771
772        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
773                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
774                                              ramp_time=ramp_time, run_time=run_time)
775        if num_jobs:
776            fio_config = fio_config + "numjobs=%s \n" % num_jobs
777        if self.cpus_allowed is not None:
778            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
779            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
780        fio_config = fio_config + filename_section
781
782        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
783        if hasattr(self, "num_cores"):
784            fio_config_filename += "_%sCPU" % self.num_cores
785        fio_config_filename += ".fio"
786
787        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
788        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
789        self.log_print("Created FIO Config:")
790        self.log_print(fio_config)
791
792        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
793
794    def set_cpu_frequency(self):
795        if self.cpu_frequency is not None:
796            try:
797                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
798                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
799                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
800            except Exception:
801                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
802                sys.exit()
803        else:
804            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
805
806    def run_fio(self, fio_config_file, run_num=None):
807        job_name, _ = os.path.splitext(fio_config_file)
808        self.log_print("Starting FIO run for job: %s" % job_name)
809        self.log_print("Using FIO: %s" % self.fio_bin)
810
811        if run_num:
812            for i in range(1, run_num + 1):
813                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
814                output = self.exec_cmd(["sudo", self.fio_bin,
815                                        fio_config_file, "--output-format=json",
816                                        "--output=%s" % output_filename], True)
817                self.log_print(output)
818        else:
819            output_filename = job_name + "_" + self.name + ".json"
820            output = self.exec_cmd(["sudo", self.fio_bin,
821                                    fio_config_file, "--output-format=json",
822                                    "--output" % output_filename], True)
823            self.log_print(output)
824        self.log_print("FIO run finished. Results in: %s" % output_filename)
825
826    def sys_config(self):
827        self.log_print("====Kernel release:====")
828        self.log_print(self.exec_cmd(["uname", "-r"]))
829        self.log_print("====Kernel command line:====")
830        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
831        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
832        self.log_print("====sysctl conf:====")
833        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
834        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
835        self.log_print("====Cpu power info:====")
836        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
837
838
839class KernelTarget(Target):
840    def __init__(self, name, general_config, target_config):
841        super(KernelTarget, self).__init__(name, general_config, target_config)
842        # Defaults
843        self.nvmet_bin = "nvmetcli"
844
845        if "nvmet_bin" in target_config:
846            self.nvmet_bin = target_config["nvmet_bin"]
847
848    def __del__(self):
849        nvmet_command(self.nvmet_bin, "clear")
850
851    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
852
853        nvmet_cfg = {
854            "ports": [],
855            "hosts": [],
856            "subsystems": [],
857        }
858
859        # Split disks between NIC IP's
860        disks_per_ip = int(len(nvme_list) / len(address_list))
861        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
862
863        subsys_no = 1
864        port_no = 0
865        for ip, chunk in zip(address_list, disk_chunks):
866            for disk in chunk:
867                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
868                nvmet_cfg["subsystems"].append({
869                    "allowed_hosts": [],
870                    "attr": {
871                        "allow_any_host": "1",
872                        "serial": "SPDK00%s" % subsys_no,
873                        "version": "1.3"
874                    },
875                    "namespaces": [
876                        {
877                            "device": {
878                                "path": disk,
879                                "uuid": "%s" % uuid.uuid4()
880                            },
881                            "enable": 1,
882                            "nsid": subsys_no
883                        }
884                    ],
885                    "nqn": nqn
886                })
887
888                nvmet_cfg["ports"].append({
889                    "addr": {
890                        "adrfam": "ipv4",
891                        "traddr": ip,
892                        "trsvcid": "%s" % (4420 + port_no),
893                        "trtype": "%s" % self.transport
894                    },
895                    "portid": subsys_no,
896                    "referrals": [],
897                    "subsystems": [nqn]
898                })
899                subsys_no += 1
900                port_no += 1
901                self.subsystem_info_list.append([port_no, nqn, ip])
902
903        with open("kernel.conf", "w") as fh:
904            fh.write(json.dumps(nvmet_cfg, indent=2))
905        pass
906
907    def tgt_start(self):
908        self.log_print("Configuring kernel NVMeOF Target")
909
910        if self.null_block:
911            print("Configuring with null block device.")
912            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
913            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
914            self.subsys_no = len(null_blk_list)
915        else:
916            print("Configuring with NVMe drives.")
917            nvme_list = get_nvme_devices()
918            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
919            self.subsys_no = len(nvme_list)
920
921        nvmet_command(self.nvmet_bin, "clear")
922        nvmet_command(self.nvmet_bin, "restore kernel.conf")
923        self.log_print("Done configuring kernel NVMeOF Target")
924
925
926class SPDKTarget(Target):
927    def __init__(self, name, general_config, target_config):
928        super(SPDKTarget, self).__init__(name, general_config, target_config)
929
930        # Required fields
931        self.core_mask = target_config["core_mask"]
932        self.num_cores = self.get_num_cores(self.core_mask)
933
934        # Defaults
935        self.dif_insert_strip = False
936        self.null_block_dif_type = 0
937        self.num_shared_buffers = 4096
938
939        if "num_shared_buffers" in target_config:
940            self.num_shared_buffers = target_config["num_shared_buffers"]
941        if "null_block_dif_type" in target_config:
942            self.null_block_dif_type = target_config["null_block_dif_type"]
943        if "dif_insert_strip" in target_config:
944            self.dif_insert_strip = target_config["dif_insert_strip"]
945
946    def get_num_cores(self, core_mask):
947        if "0x" in core_mask:
948            return bin(int(core_mask, 16)).count("1")
949        else:
950            num_cores = 0
951            core_mask = core_mask.replace("[", "")
952            core_mask = core_mask.replace("]", "")
953            for i in core_mask.split(","):
954                if "-" in i:
955                    x, y = i.split("-")
956                    num_cores += len(range(int(x), int(y))) + 1
957                else:
958                    num_cores += 1
959            return num_cores
960
961    def spdk_tgt_configure(self):
962        self.log_print("Configuring SPDK NVMeOF target via RPC")
963        numa_list = get_used_numa_nodes()
964
965        # Create RDMA transport layer
966        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
967                                       num_shared_buffers=self.num_shared_buffers,
968                                       dif_insert_or_strip=self.dif_insert_strip,
969                                       sock_priority=self.adq_priority)
970        self.log_print("SPDK NVMeOF transport layer:")
971        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
972
973        if self.null_block:
974            self.spdk_tgt_add_nullblock(self.null_block)
975            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
976        else:
977            self.spdk_tgt_add_nvme_conf()
978            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
979        self.log_print("Done configuring SPDK NVMeOF Target")
980
981    def spdk_tgt_add_nullblock(self, null_block_count):
982        md_size = 0
983        block_size = 4096
984        if self.null_block_dif_type != 0:
985            md_size = 128
986
987        self.log_print("Adding null block bdevices to config via RPC")
988        for i in range(null_block_count):
989            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
990            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
991                                      dif_type=self.null_block_dif_type, md_size=md_size)
992        self.log_print("SPDK Bdevs configuration:")
993        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
994
995    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
996        self.log_print("Adding NVMe bdevs to config via RPC")
997
998        bdfs = get_nvme_devices_bdf()
999        bdfs = [b.replace(":", ".") for b in bdfs]
1000
1001        if req_num_disks:
1002            if req_num_disks > len(bdfs):
1003                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1004                sys.exit(1)
1005            else:
1006                bdfs = bdfs[0:req_num_disks]
1007
1008        for i, bdf in enumerate(bdfs):
1009            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1010
1011        self.log_print("SPDK Bdevs configuration:")
1012        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1013
1014    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1015        self.log_print("Adding subsystems to config")
1016        port = "4420"
1017        if not req_num_disks:
1018            req_num_disks = get_nvme_devices_count()
1019
1020        # Distribute bdevs between provided NICs
1021        num_disks = range(0, req_num_disks)
1022        if len(num_disks) == 1:
1023            disks_per_ip = 1
1024        else:
1025            disks_per_ip = int(len(num_disks) / len(ips))
1026        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1027
1028        # Create subsystems, add bdevs to namespaces, add listeners
1029        for ip, chunk in zip(ips, disk_chunks):
1030            for c in chunk:
1031                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1032                serial = "SPDK00%s" % c
1033                bdev_name = "Nvme%sn1" % c
1034                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1035                                               allow_any_host=True, max_namespaces=8)
1036                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1037
1038                rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
1039                                                     trtype=self.transport,
1040                                                     traddr=ip,
1041                                                     trsvcid=port,
1042                                                     adrfam="ipv4")
1043
1044                self.subsystem_info_list.append([port, nqn, ip])
1045        self.log_print("SPDK NVMeOF subsystem configuration:")
1046        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1047
1048    def tgt_start(self):
1049        if self.null_block:
1050            self.subsys_no = 1
1051        else:
1052            self.subsys_no = get_nvme_devices_count()
1053        self.log_print("Starting SPDK NVMeOF Target process")
1054        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1055        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1056        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1057
1058        with open(self.pid, "w") as fh:
1059            fh.write(str(proc.pid))
1060        self.nvmf_proc = proc
1061        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1062        self.log_print("Waiting for spdk to initilize...")
1063        while True:
1064            if os.path.exists("/var/tmp/spdk.sock"):
1065                break
1066            time.sleep(1)
1067        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1068
1069        if self.enable_zcopy:
1070            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1071                                           enable_zerocopy_send=True)
1072            self.log_print("Target socket options:")
1073            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1074
1075        if self.enable_adq:
1076            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1077            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1078                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1079            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1080
1081        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1082
1083        rpc.framework_start_init(self.client)
1084        self.spdk_tgt_configure()
1085
1086    def __del__(self):
1087        if hasattr(self, "nvmf_proc"):
1088            try:
1089                self.nvmf_proc.terminate()
1090                self.nvmf_proc.wait()
1091            except Exception as e:
1092                self.log_print(e)
1093                self.nvmf_proc.kill()
1094                self.nvmf_proc.communicate()
1095
1096
1097class KernelInitiator(Initiator):
1098    def __init__(self, name, general_config, initiator_config):
1099        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1100
1101        # Defaults
1102        self.extra_params = ""
1103
1104        if "extra_params" in initiator_config:
1105            self.extra_params = initiator_config["extra_params"]
1106
1107    def __del__(self):
1108        self.ssh_connection.close()
1109
1110    def kernel_init_connect(self, address_list, subsys_no):
1111        self.log_print("Below connection attempts may result in error messages, this is expected!")
1112        for subsystem in self.subsystem_info_list:
1113            self.log_print("Trying to connect %s %s %s" % subsystem)
1114            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1115                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1116            time.sleep(2)
1117
1118    def kernel_init_disconnect(self, address_list, subsys_no):
1119        for subsystem in self.subsystem_info_list:
1120            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1121            time.sleep(1)
1122
1123    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1124        out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
1125                             "|", "awk", "'{print $1}'"])
1126        nvme_list = [x for x in out.split("\n") if "nvme" in x]
1127
1128        filename_section = ""
1129        nvme_per_split = int(len(nvme_list) / len(threads))
1130        remainder = len(nvme_list) % len(threads)
1131        iterator = iter(nvme_list)
1132        result = []
1133        for i in range(len(threads)):
1134            result.append([])
1135            for j in range(nvme_per_split):
1136                result[i].append(next(iterator))
1137                if remainder:
1138                    result[i].append(next(iterator))
1139                    remainder -= 1
1140        for i, r in enumerate(result):
1141            header = "[filename%s]" % i
1142            disks = "\n".join(["filename=%s" % x for x in r])
1143            job_section_qd = round((io_depth * len(r)) / num_jobs)
1144            if job_section_qd == 0:
1145                job_section_qd = 1
1146            iodepth = "iodepth=%s" % job_section_qd
1147            filename_section = "\n".join([filename_section, header, disks, iodepth])
1148
1149        return filename_section
1150
1151
1152class SPDKInitiator(Initiator):
1153    def __init__(self, name, general_config, initiator_config):
1154        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1155
1156        # Required fields
1157        self.num_cores = initiator_config["num_cores"]
1158
1159    def install_spdk(self, local_spdk_zip):
1160        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
1161        self.log_print("Copied sources zip from target")
1162        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
1163
1164        self.log_print("Sources unpacked")
1165        self.log_print("Using fio binary %s" % self.fio_bin)
1166        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1167        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1168        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1169        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1170        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1171
1172        self.log_print("SPDK built")
1173        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1174
1175    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1176        bdev_cfg_section = {
1177            "subsystems": [
1178                {
1179                    "subsystem": "bdev",
1180                    "config": []
1181                }
1182            ]
1183        }
1184
1185        for i, subsys in enumerate(remote_subsystem_list):
1186            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1187            nvme_ctrl = {
1188                "method": "bdev_nvme_attach_controller",
1189                "params": {
1190                    "name": "Nvme{}".format(i),
1191                    "trtype": self.transport,
1192                    "traddr": sub_addr,
1193                    "trsvcid": sub_port,
1194                    "subnqn": sub_nqn,
1195                    "adrfam": "IPv4"
1196                }
1197            }
1198
1199            if self.enable_adq:
1200                nvme_ctrl["params"].update({"priority": "1"})
1201
1202            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1203
1204        return json.dumps(bdev_cfg_section, indent=2)
1205
1206    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1207        filename_section = ""
1208        if len(threads) >= len(subsystems):
1209            threads = range(0, len(subsystems))
1210        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1211        nvme_per_split = int(len(subsystems) / len(threads))
1212        remainder = len(subsystems) % len(threads)
1213        iterator = iter(filenames)
1214        result = []
1215        for i in range(len(threads)):
1216            result.append([])
1217            for j in range(nvme_per_split):
1218                result[i].append(next(iterator))
1219            if remainder:
1220                result[i].append(next(iterator))
1221                remainder -= 1
1222        for i, r in enumerate(result):
1223            header = "[filename%s]" % i
1224            disks = "\n".join(["filename=%s" % x for x in r])
1225            job_section_qd = round((io_depth * len(r)) / num_jobs)
1226            if job_section_qd == 0:
1227                job_section_qd = 1
1228            iodepth = "iodepth=%s" % job_section_qd
1229            filename_section = "\n".join([filename_section, header, disks, iodepth])
1230
1231        return filename_section
1232
1233
1234if __name__ == "__main__":
1235    spdk_zip_path = "/tmp/spdk.zip"
1236    target_results_dir = "/tmp/results"
1237
1238    if (len(sys.argv) > 1):
1239        config_file_path = sys.argv[1]
1240    else:
1241        script_full_dir = os.path.dirname(os.path.realpath(__file__))
1242        config_file_path = os.path.join(script_full_dir, "config.json")
1243
1244    print("Using config file: %s" % config_file_path)
1245    with open(config_file_path, "r") as config:
1246        data = json.load(config)
1247
1248    initiators = []
1249    fio_cases = []
1250
1251    general_config = data["general"]
1252    target_config = data["target"]
1253    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1254
1255    for k, v in data.items():
1256        if "target" in k:
1257            if data[k]["mode"] == "spdk":
1258                target_obj = SPDKTarget(k, data["general"], v)
1259            elif data[k]["mode"] == "kernel":
1260                target_obj = KernelTarget(k, data["general"], v)
1261                pass
1262        elif "initiator" in k:
1263            if data[k]["mode"] == "spdk":
1264                init_obj = SPDKInitiator(k, data["general"], v)
1265            elif data[k]["mode"] == "kernel":
1266                init_obj = KernelInitiator(k, data["general"], v)
1267            initiators.append(init_obj)
1268        elif "fio" in k:
1269            fio_workloads = itertools.product(data[k]["bs"],
1270                                              data[k]["qd"],
1271                                              data[k]["rw"])
1272
1273            fio_run_time = data[k]["run_time"]
1274            fio_ramp_time = data[k]["ramp_time"]
1275            fio_rw_mix_read = data[k]["rwmixread"]
1276            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1277            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1278        else:
1279            continue
1280
1281    # Copy and install SPDK on remote initiators
1282    if "skip_spdk_install" not in data["general"]:
1283        target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
1284        threads = []
1285        for i in initiators:
1286            if i.mode == "spdk":
1287                t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
1288                threads.append(t)
1289                t.start()
1290        for t in threads:
1291            t.join()
1292
1293    target_obj.tgt_start()
1294
1295    try:
1296        os.mkdir(target_results_dir)
1297    except FileExistsError:
1298        pass
1299
1300    for i in initiators:
1301        i.discover_subsystems(i.remote_nic_ips, target_obj.subsys_no)
1302
1303    # Poor mans threading
1304    # Run FIO tests
1305    for block_size, io_depth, rw in fio_workloads:
1306        threads = []
1307        configs = []
1308        for i in initiators:
1309            if i.mode == "kernel":
1310                i.kernel_init_connect(i.remote_nic_ips, target_obj.subsys_no)
1311
1312            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1313                                   fio_num_jobs, fio_ramp_time, fio_run_time)
1314            configs.append(cfg)
1315
1316        for i, cfg in zip(initiators, configs):
1317            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1318            threads.append(t)
1319        if target_obj.enable_sar:
1320            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1321            sar_file_name = ".".join([sar_file_name, "txt"])
1322            t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
1323            threads.append(t)
1324
1325        if target_obj.enable_pcm:
1326            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1327
1328            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_fnames[0],))
1329            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_fnames[1],))
1330            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(target_results_dir, pcm_fnames[2],))
1331
1332            threads.append(pcm_cpu_t)
1333            threads.append(pcm_mem_t)
1334            threads.append(pcm_pow_t)
1335
1336        if target_obj.enable_bandwidth:
1337            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1338            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1339            t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
1340            threads.append(t)
1341
1342        if target_obj.enable_dpdk_memory:
1343            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(target_results_dir))
1344            threads.append(t)
1345
1346        for t in threads:
1347            t.start()
1348        for t in threads:
1349            t.join()
1350
1351        for i in initiators:
1352            if i.mode == "kernel":
1353                i.kernel_init_disconnect(i.remote_nic_ips, target_obj.subsys_no)
1354            i.copy_result_files(target_results_dir)
1355
1356    target_obj.restore_governor()
1357    target_obj.restore_tuned()
1358    target_obj.restore_services()
1359    target_obj.restore_sysctl()
1360    for i in initiators:
1361        i.restore_governor()
1362        i.restore_tuned()
1363        i.restore_services()
1364        i.restore_sysctl()
1365    target_obj.parse_results(target_results_dir)
1366