xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision da2fd6651a9cd4732b0910d30291821e77f4d643)
1#!/usr/bin/env python3
2
3import os
4import re
5import sys
6import argparse
7import json
8import zipfile
9import threading
10import subprocess
11import itertools
12import configparser
13import time
14import uuid
15from collections import OrderedDict
16
17import paramiko
18import pandas as pd
19
20import rpc
21import rpc.client
22from common import *
23
24
25class Server:
26    def __init__(self, name, general_config, server_config):
27        self.name = name
28        self.username = general_config["username"]
29        self.password = general_config["password"]
30        self.transport = general_config["transport"].lower()
31        self.nic_ips = server_config["nic_ips"]
32        self.mode = server_config["mode"]
33
34        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
35        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
36            self.irq_scripts_dir = server_config["irq_scripts_dir"]
37
38        self.local_nic_info = []
39        self._nics_json_obj = {}
40        self.svc_restore_dict = {}
41        self.sysctl_restore_dict = {}
42        self.tuned_restore_dict = {}
43        self.governor_restore = ""
44        self.tuned_profile = ""
45
46        self.enable_adq = False
47        self.adq_priority = None
48        if "adq_enable" in server_config and server_config["adq_enable"]:
49            self.enable_adq = server_config["adq_enable"]
50            self.adq_priority = 1
51
52        if "tuned_profile" in server_config:
53            self.tuned_profile = server_config["tuned_profile"]
54
55        if not re.match("^[A-Za-z0-9]*$", name):
56            self.log_print("Please use a name which contains only letters or numbers")
57            sys.exit(1)
58
59    def log_print(self, msg):
60        print("[%s] %s" % (self.name, msg), flush=True)
61
62    def get_uncommented_lines(self, lines):
63        return [line for line in lines if line and not line.startswith('#')]
64
65    def get_nic_name_by_ip(self, ip):
66        if not self._nics_json_obj:
67            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
68            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
69        for nic in self._nics_json_obj:
70            for addr in nic["addr_info"]:
71                if ip in addr["local"]:
72                    return nic["ifname"]
73
74    def set_local_nic_info_helper(self):
75        pass
76
77    def set_local_nic_info(self, pci_info):
78        def extract_network_elements(json_obj):
79            nic_list = []
80            if isinstance(json_obj, list):
81                for x in json_obj:
82                    nic_list.extend(extract_network_elements(x))
83            elif isinstance(json_obj, dict):
84                if "children" in json_obj:
85                    nic_list.extend(extract_network_elements(json_obj["children"]))
86                if "class" in json_obj.keys() and "network" in json_obj["class"]:
87                    nic_list.append(json_obj)
88            return nic_list
89
90        self.local_nic_info = extract_network_elements(pci_info)
91
92    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
93        return ""
94
95    def configure_system(self):
96        self.configure_services()
97        self.configure_sysctl()
98        self.configure_tuned()
99        self.configure_cpu_governor()
100        self.configure_irq_affinity()
101
102    def configure_adq(self):
103        if self.mode == "kernel":
104            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
105            return
106        self.adq_load_modules()
107        self.adq_configure_nic()
108
109    def adq_load_modules(self):
110        self.log_print("Modprobing ADQ-related Linux modules...")
111        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
112        for module in adq_module_deps:
113            try:
114                self.exec_cmd(["sudo", "modprobe", module])
115                self.log_print("%s loaded!" % module)
116            except CalledProcessError as e:
117                self.log_print("ERROR: failed to load module %s" % module)
118                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
119
120    def adq_configure_tc(self):
121        self.log_print("Configuring ADQ Traffic classess and filters...")
122
123        if self.mode == "kernel":
124            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
125            return
126
127        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
128        num_queues_tc1 = self.num_cores
129        port_param = "dst_port" if isinstance(self, Target) else "src_port"
130        ports = set([p[0] for p in self.subsystem_info_list])
131        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
132
133        for nic_ip in self.nic_ips:
134            nic_name = self.get_nic_name_by_ip(nic_ip)
135            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
136                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
137                                "queues", "%s@0" % num_queues_tc0,
138                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
139                                "hw", "1", "mode", "channel"]
140            self.log_print(" ".join(tc_qdisc_map_cmd))
141            self.exec_cmd(tc_qdisc_map_cmd)
142
143            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
144            self.log_print(" ".join(tc_qdisc_ingress_cmd))
145            self.exec_cmd(tc_qdisc_ingress_cmd)
146
147            for port in ports:
148                tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
149                                 "protocol", "ip", "ingress", "prio", "1", "flower",
150                                 "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
151                                 "skip_sw", "hw_tc", "1"]
152                self.log_print(" ".join(tc_filter_cmd))
153                self.exec_cmd(tc_filter_cmd)
154
155            # Ethtool coalese settings must be applied after configuring traffic classes
156            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
157            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
158
159            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
160            xps_cmd = ["sudo", xps_script_path, nic_name]
161            self.log_print(xps_cmd)
162            self.exec_cmd(xps_cmd)
163
164    def adq_configure_nic(self):
165        self.log_print("Configuring NIC port settings for ADQ testing...")
166
167        # Reload the driver first, to make sure any previous settings are re-set.
168        try:
169            self.exec_cmd(["sudo", "rmmod", "ice"])
170            self.exec_cmd(["sudo", "modprobe", "ice"])
171        except CalledProcessError as e:
172            self.log_print("ERROR: failed to reload ice module!")
173            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
174
175        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
176        for nic in nic_names:
177            self.log_print(nic)
178            try:
179                self.exec_cmd(["sudo", "ethtool", "-K", nic,
180                               "hw-tc-offload", "on"])  # Enable hardware TC offload
181                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
182                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
183                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
184                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
185                               "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
186            except CalledProcessError as e:
187                self.log_print("ERROR: failed to configure NIC port using ethtool!")
188                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
189                self.log_print("Please update your NIC driver and firmware versions and try again.")
190            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
191            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
192
193    def configure_services(self):
194        self.log_print("Configuring active services...")
195        svc_config = configparser.ConfigParser(strict=False)
196
197        # Below list is valid only for RHEL / Fedora systems and might not
198        # contain valid names for other distributions.
199        svc_target_state = {
200            "firewalld": "inactive",
201            "irqbalance": "inactive",
202            "lldpad.service": "inactive",
203            "lldpad.socket": "inactive"
204        }
205
206        for service in svc_target_state:
207            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
208            out = "\n".join(["[%s]" % service, out])
209            svc_config.read_string(out)
210
211            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
212                continue
213
214            service_state = svc_config[service]["ActiveState"]
215            self.log_print("Current state of %s service is %s" % (service, service_state))
216            self.svc_restore_dict.update({service: service_state})
217            if service_state != "inactive":
218                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
219                self.exec_cmd(["sudo", "systemctl", "stop", service])
220
221    def configure_sysctl(self):
222        self.log_print("Tuning sysctl settings...")
223
224        busy_read = 0
225        if self.enable_adq and self.mode == "spdk":
226            busy_read = 1
227
228        sysctl_opts = {
229            "net.core.busy_poll": 0,
230            "net.core.busy_read": busy_read,
231            "net.core.somaxconn": 4096,
232            "net.core.netdev_max_backlog": 8192,
233            "net.ipv4.tcp_max_syn_backlog": 16384,
234            "net.core.rmem_max": 268435456,
235            "net.core.wmem_max": 268435456,
236            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
237            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
238            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
239            "net.ipv4.route.flush": 1,
240            "vm.overcommit_memory": 1,
241        }
242
243        for opt, value in sysctl_opts.items():
244            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
245            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
246
247    def configure_tuned(self):
248        if not self.tuned_profile:
249            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
250            return
251
252        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
253        service = "tuned"
254        tuned_config = configparser.ConfigParser(strict=False)
255
256        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
257        out = "\n".join(["[%s]" % service, out])
258        tuned_config.read_string(out)
259        tuned_state = tuned_config[service]["ActiveState"]
260        self.svc_restore_dict.update({service: tuned_state})
261
262        if tuned_state != "inactive":
263            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
264            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
265
266            self.tuned_restore_dict = {
267                "profile": profile,
268                "mode": profile_mode
269            }
270
271        self.exec_cmd(["sudo", "systemctl", "start", service])
272        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
273        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
274
275    def configure_cpu_governor(self):
276        self.log_print("Setting CPU governor to performance...")
277
278        # This assumes that there is the same CPU scaling governor on each CPU
279        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
280        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
281
282    def configure_irq_affinity(self):
283        self.log_print("Setting NIC irq affinity for NICs...")
284
285        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
286        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
287        for nic in nic_names:
288            irq_cmd = ["sudo", irq_script_path, nic]
289            self.log_print(irq_cmd)
290            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
291
292    def restore_services(self):
293        self.log_print("Restoring services...")
294        for service, state in self.svc_restore_dict.items():
295            cmd = "stop" if state == "inactive" else "start"
296            self.exec_cmd(["sudo", "systemctl", cmd, service])
297
298    def restore_sysctl(self):
299        self.log_print("Restoring sysctl settings...")
300        for opt, value in self.sysctl_restore_dict.items():
301            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
302
303    def restore_tuned(self):
304        self.log_print("Restoring tuned-adm settings...")
305
306        if not self.tuned_restore_dict:
307            return
308
309        if self.tuned_restore_dict["mode"] == "auto":
310            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
311            self.log_print("Reverted tuned-adm to auto_profile.")
312        else:
313            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
314            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
315
316    def restore_governor(self):
317        self.log_print("Restoring CPU governor setting...")
318        if self.governor_restore:
319            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
320            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
321
322
323class Target(Server):
324    def __init__(self, name, general_config, target_config):
325        super(Target, self).__init__(name, general_config, target_config)
326
327        # Defaults
328        self.enable_sar = False
329        self.sar_delay = 0
330        self.sar_interval = 0
331        self.sar_count = 0
332        self.enable_pcm = False
333        self.pcm_dir = ""
334        self.pcm_delay = 0
335        self.pcm_interval = 0
336        self.pcm_count = 0
337        self.enable_bandwidth = 0
338        self.bandwidth_count = 0
339        self.enable_dpdk_memory = False
340        self.dpdk_wait_time = 0
341        self.enable_zcopy = False
342        self.scheduler_name = "static"
343        self.null_block = 0
344        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
345        self.subsystem_info_list = []
346
347        if "null_block_devices" in target_config:
348            self.null_block = target_config["null_block_devices"]
349        if "sar_settings" in target_config:
350            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
351        if "pcm_settings" in target_config:
352            self.enable_pcm = True
353            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
354        if "enable_bandwidth" in target_config:
355            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
356        if "enable_dpdk_memory" in target_config:
357            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
358        if "scheduler_settings" in target_config:
359            self.scheduler_name = target_config["scheduler_settings"]
360        if "zcopy_settings" in target_config:
361            self.enable_zcopy = target_config["zcopy_settings"]
362
363        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
364        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
365        self.set_local_nic_info(self.set_local_nic_info_helper())
366
367        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
368            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
369
370        self.configure_system()
371        if self.enable_adq:
372            self.configure_adq()
373        self.sys_config()
374
375    def set_local_nic_info_helper(self):
376        return json.loads(self.exec_cmd(["lshw", "-json"]))
377
378    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
379        stderr_opt = None
380        if stderr_redirect:
381            stderr_opt = subprocess.STDOUT
382        if change_dir:
383            old_cwd = os.getcwd()
384            os.chdir(change_dir)
385            self.log_print("Changing directory to %s" % change_dir)
386
387        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
388
389        if change_dir:
390            os.chdir(old_cwd)
391            self.log_print("Changing directory to %s" % old_cwd)
392        return out
393
394    def zip_spdk_sources(self, spdk_dir, dest_file):
395        self.log_print("Zipping SPDK source directory")
396        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
397        for root, directories, files in os.walk(spdk_dir, followlinks=True):
398            for file in files:
399                fh.write(os.path.relpath(os.path.join(root, file)))
400        fh.close()
401        self.log_print("Done zipping")
402
403    def read_json_stats(self, file):
404        with open(file, "r") as json_data:
405            data = json.load(json_data)
406            job_pos = 0  # job_post = 0 because using aggregated results
407
408            # Check if latency is in nano or microseconds to choose correct dict key
409            def get_lat_unit(key_prefix, dict_section):
410                # key prefix - lat, clat or slat.
411                # dict section - portion of json containing latency bucket in question
412                # Return dict key to access the bucket and unit as string
413                for k, _ in dict_section.items():
414                    if k.startswith(key_prefix):
415                        return k, k.split("_")[1]
416
417            def get_clat_percentiles(clat_dict_leaf):
418                if "percentile" in clat_dict_leaf:
419                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
420                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
421                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
422                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
423
424                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
425                else:
426                    # Latest fio versions do not provide "percentile" results if no
427                    # measurements were done, so just return zeroes
428                    return [0, 0, 0, 0]
429
430            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
431            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
432            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
433            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
434            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
435            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
436            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
437            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
438                data["jobs"][job_pos]["read"][clat_key])
439
440            if "ns" in lat_unit:
441                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
442            if "ns" in clat_unit:
443                read_p99_lat = read_p99_lat / 1000
444                read_p99_9_lat = read_p99_9_lat / 1000
445                read_p99_99_lat = read_p99_99_lat / 1000
446                read_p99_999_lat = read_p99_999_lat / 1000
447
448            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
449            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
450            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
451            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
452            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
453            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
454            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
455            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
456                data["jobs"][job_pos]["write"][clat_key])
457
458            if "ns" in lat_unit:
459                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
460            if "ns" in clat_unit:
461                write_p99_lat = write_p99_lat / 1000
462                write_p99_9_lat = write_p99_9_lat / 1000
463                write_p99_99_lat = write_p99_99_lat / 1000
464                write_p99_999_lat = write_p99_999_lat / 1000
465
466        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
467                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
468                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
469                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
470
471    def parse_results(self, results_dir, csv_file):
472        files = os.listdir(results_dir)
473        fio_files = filter(lambda x: ".fio" in x, files)
474        json_files = [x for x in files if ".json" in x]
475
476        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
477                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
478                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
479                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
480
481        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
482                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
483
484        header_line = ",".join(["Name", *headers])
485        aggr_header_line = ",".join(["Name", *aggr_headers])
486
487        # Create empty results file
488        with open(os.path.join(results_dir, csv_file), "w") as fh:
489            fh.write(aggr_header_line + "\n")
490        rows = set()
491
492        for fio_config in fio_files:
493            self.log_print("Getting FIO stats for %s" % fio_config)
494            job_name, _ = os.path.splitext(fio_config)
495
496            # Look in the filename for rwmixread value. Function arguments do
497            # not have that information.
498            # TODO: Improve this function by directly using workload params instead
499            # of regexing through filenames.
500            if "read" in job_name:
501                rw_mixread = 1
502            elif "write" in job_name:
503                rw_mixread = 0
504            else:
505                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
506
507            # If "_CPU" exists in name - ignore it
508            # Initiators for the same job could have diffrent num_cores parameter
509            job_name = re.sub(r"_\d+CPU", "", job_name)
510            job_result_files = [x for x in json_files if job_name in x]
511            self.log_print("Matching result files for current fio config:")
512            for j in job_result_files:
513                self.log_print("\t %s" % j)
514
515            # There may have been more than 1 initiator used in test, need to check that
516            # Result files are created so that string after last "_" separator is server name
517            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
518            inits_avg_results = []
519            for i in inits_names:
520                self.log_print("\tGetting stats for initiator %s" % i)
521                # There may have been more than 1 test run for this job, calculate average results for initiator
522                i_results = [x for x in job_result_files if i in x]
523                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
524
525                separate_stats = []
526                for r in i_results:
527                    stats = self.read_json_stats(os.path.join(results_dir, r))
528                    separate_stats.append(stats)
529                    self.log_print(stats)
530
531                init_results = [sum(x) for x in zip(*separate_stats)]
532                init_results = [x / len(separate_stats) for x in init_results]
533                inits_avg_results.append(init_results)
534
535                self.log_print("\tAverage results for initiator %s" % i)
536                self.log_print(init_results)
537                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
538                    fh.write(header_line + "\n")
539                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
540
541            # Sum results of all initiators running this FIO job.
542            # Latency results are an average of latencies from accros all initiators.
543            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
544            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
545            for key in inits_avg_results:
546                if "lat" in key:
547                    inits_avg_results[key] /= len(inits_names)
548
549            # Aggregate separate read/write values into common labels
550            # Take rw_mixread into consideration for mixed read/write workloads.
551            aggregate_results = OrderedDict()
552            for h in aggr_headers:
553                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
554                if "lat" in h:
555                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
556                else:
557                    _ = read_stat + write_stat
558                aggregate_results[h] = "{0:.3f}".format(_)
559
560            rows.add(",".join([job_name, *aggregate_results.values()]))
561
562        # Save results to file
563        for row in rows:
564            with open(os.path.join(results_dir, csv_file), "a") as fh:
565                fh.write(row + "\n")
566        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
567
568    def measure_sar(self, results_dir, sar_file_name):
569        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
570        time.sleep(self.sar_delay)
571        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
572        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
573            for line in out.split("\n"):
574                if "Average" in line and "CPU" in line:
575                    self.log_print("Summary CPU utilization from SAR:")
576                    self.log_print(line)
577                if "Average" in line and "all" in line:
578                    self.log_print(line)
579            fh.write(out)
580
581    def measure_pcm_memory(self, results_dir, pcm_file_name):
582        time.sleep(self.pcm_delay)
583        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
584        pcm_memory = subprocess.Popen(cmd)
585        time.sleep(self.pcm_count)
586        pcm_memory.terminate()
587
588    def measure_pcm(self, results_dir, pcm_file_name):
589        time.sleep(self.pcm_delay)
590        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
591        subprocess.run(cmd)
592        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
593        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
594        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
595        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
596        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
597
598    def measure_pcm_power(self, results_dir, pcm_power_file_name):
599        time.sleep(self.pcm_delay)
600        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
601        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
602            fh.write(out)
603
604    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
605        self.log_print("INFO: starting network bandwidth measure")
606        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
607                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
608
609    def measure_dpdk_memory(self, results_dir):
610        self.log_print("INFO: waiting to generate DPDK memory usage")
611        time.sleep(self.dpdk_wait_time)
612        self.log_print("INFO: generating DPDK memory usage")
613        rpc.env.env_dpdk_get_mem_stats
614        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
615
616    def sys_config(self):
617        self.log_print("====Kernel release:====")
618        self.log_print(os.uname().release)
619        self.log_print("====Kernel command line:====")
620        with open('/proc/cmdline') as f:
621            cmdline = f.readlines()
622            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
623        self.log_print("====sysctl conf:====")
624        with open('/etc/sysctl.conf') as f:
625            sysctl = f.readlines()
626            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
627        self.log_print("====Cpu power info:====")
628        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
629        self.log_print("====zcopy settings:====")
630        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
631        self.log_print("====Scheduler settings:====")
632        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
633
634
635class Initiator(Server):
636    def __init__(self, name, general_config, initiator_config):
637        super(Initiator, self).__init__(name, general_config, initiator_config)
638
639        # Required fields
640        self.ip = initiator_config["ip"]
641        self.target_nic_ips = initiator_config["target_nic_ips"]
642
643        # Defaults
644        self.cpus_allowed = None
645        self.cpus_allowed_policy = "shared"
646        self.spdk_dir = "/tmp/spdk"
647        self.fio_bin = "/usr/src/fio/fio"
648        self.nvmecli_bin = "nvme"
649        self.cpu_frequency = None
650        self.subsystem_info_list = []
651
652        if "spdk_dir" in initiator_config:
653            self.spdk_dir = initiator_config["spdk_dir"]
654        if "fio_bin" in initiator_config:
655            self.fio_bin = initiator_config["fio_bin"]
656        if "nvmecli_bin" in initiator_config:
657            self.nvmecli_bin = initiator_config["nvmecli_bin"]
658        if "cpus_allowed" in initiator_config:
659            self.cpus_allowed = initiator_config["cpus_allowed"]
660        if "cpus_allowed_policy" in initiator_config:
661            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
662        if "cpu_frequency" in initiator_config:
663            self.cpu_frequency = initiator_config["cpu_frequency"]
664        if os.getenv('SPDK_WORKSPACE'):
665            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
666
667        self.ssh_connection = paramiko.SSHClient()
668        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
669        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
670        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
671        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
672        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
673
674        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
675            self.copy_spdk("/tmp/spdk.zip")
676        self.set_local_nic_info(self.set_local_nic_info_helper())
677        self.set_cpu_frequency()
678        self.configure_system()
679        if self.enable_adq:
680            self.configure_adq()
681        self.sys_config()
682
683    def set_local_nic_info_helper(self):
684        return json.loads(self.exec_cmd(["lshw", "-json"]))
685
686    def __del__(self):
687        self.ssh_connection.close()
688
689    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
690        if change_dir:
691            cmd = ["cd", change_dir, ";", *cmd]
692
693        # In case one of the command elements contains whitespace and is not
694        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
695        # when sending to remote system.
696        for i, c in enumerate(cmd):
697            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
698                cmd[i] = '"%s"' % c
699        cmd = " ".join(cmd)
700
701        # Redirect stderr to stdout thanks using get_pty option if needed
702        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
703        out = stdout.read().decode(encoding="utf-8")
704
705        # Check the return code
706        rc = stdout.channel.recv_exit_status()
707        if rc:
708            raise CalledProcessError(int(rc), cmd, out)
709
710        return out
711
712    def put_file(self, local, remote_dest):
713        ftp = self.ssh_connection.open_sftp()
714        ftp.put(local, remote_dest)
715        ftp.close()
716
717    def get_file(self, remote, local_dest):
718        ftp = self.ssh_connection.open_sftp()
719        ftp.get(remote, local_dest)
720        ftp.close()
721
722    def copy_spdk(self, local_spdk_zip):
723        self.log_print("Copying SPDK sources to initiator %s" % self.name)
724        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
725        self.log_print("Copied sources zip from target")
726        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
727        self.log_print("Sources unpacked")
728
729    def copy_result_files(self, dest_dir):
730        self.log_print("Copying results")
731
732        if not os.path.exists(dest_dir):
733            os.mkdir(dest_dir)
734
735        # Get list of result files from initiator and copy them back to target
736        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
737
738        for file in file_list:
739            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
740                          os.path.join(dest_dir, file))
741        self.log_print("Done copying results")
742
743    def discover_subsystems(self, address_list, subsys_no):
744        num_nvmes = range(0, subsys_no)
745        nvme_discover_output = ""
746        for ip, subsys_no in itertools.product(address_list, num_nvmes):
747            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
748            nvme_discover_cmd = ["sudo",
749                                 "%s" % self.nvmecli_bin,
750                                 "discover", "-t", "%s" % self.transport,
751                                 "-s", "%s" % (4420 + subsys_no),
752                                 "-a", "%s" % ip]
753
754            try:
755                stdout = self.exec_cmd(nvme_discover_cmd)
756                if stdout:
757                    nvme_discover_output = nvme_discover_output + stdout
758            except CalledProcessError:
759                # Do nothing. In case of discovering remote subsystems of kernel target
760                # we expect "nvme discover" to fail a bunch of times because we basically
761                # scan ports.
762                pass
763
764        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
765                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
766                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
767                                nvme_discover_output)  # from nvme discovery output
768        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
769        subsystems = list(set(subsystems))
770        subsystems.sort(key=lambda x: x[1])
771        self.log_print("Found matching subsystems on target side:")
772        for s in subsystems:
773            self.log_print(s)
774        self.subsystem_info_list = subsystems
775
776    def gen_fio_filename_conf(self, *args, **kwargs):
777        # Logic implemented in SPDKInitiator and KernelInitiator classes
778        pass
779
780    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
781        fio_conf_template = """
782[global]
783ioengine={ioengine}
784{spdk_conf}
785thread=1
786group_reporting=1
787direct=1
788percentile_list=50:90:99:99.5:99.9:99.99:99.999
789
790norandommap=1
791rw={rw}
792rwmixread={rwmixread}
793bs={block_size}
794time_based=1
795ramp_time={ramp_time}
796runtime={run_time}
797rate_iops={rate_iops}
798"""
799        if "spdk" in self.mode:
800            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
801            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
802            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
803            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
804        else:
805            ioengine = "libaio"
806            spdk_conf = ""
807            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
808                                 "|", "awk", "'{print $1}'"])
809            subsystems = [x for x in out.split("\n") if "nvme" in x]
810
811        if self.cpus_allowed is not None:
812            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
813            cpus_num = 0
814            cpus = self.cpus_allowed.split(",")
815            for cpu in cpus:
816                if "-" in cpu:
817                    a, b = cpu.split("-")
818                    a = int(a)
819                    b = int(b)
820                    cpus_num += len(range(a, b))
821                else:
822                    cpus_num += 1
823            self.num_cores = cpus_num
824            threads = range(0, self.num_cores)
825        elif hasattr(self, 'num_cores'):
826            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
827            threads = range(0, int(self.num_cores))
828        else:
829            self.num_cores = len(subsystems)
830            threads = range(0, len(subsystems))
831
832        if "spdk" in self.mode:
833            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
834        else:
835            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
836
837        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
838                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
839                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
840        if num_jobs:
841            fio_config = fio_config + "numjobs=%s \n" % num_jobs
842        if self.cpus_allowed is not None:
843            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
844            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
845        fio_config = fio_config + filename_section
846
847        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
848        if hasattr(self, "num_cores"):
849            fio_config_filename += "_%sCPU" % self.num_cores
850        fio_config_filename += ".fio"
851
852        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
853        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
854        self.log_print("Created FIO Config:")
855        self.log_print(fio_config)
856
857        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
858
859    def set_cpu_frequency(self):
860        if self.cpu_frequency is not None:
861            try:
862                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
863                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
864                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
865            except Exception:
866                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
867                sys.exit()
868        else:
869            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
870
871    def run_fio(self, fio_config_file, run_num=None):
872        job_name, _ = os.path.splitext(fio_config_file)
873        self.log_print("Starting FIO run for job: %s" % job_name)
874        self.log_print("Using FIO: %s" % self.fio_bin)
875
876        if run_num:
877            for i in range(1, run_num + 1):
878                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
879                output = self.exec_cmd(["sudo", self.fio_bin,
880                                        fio_config_file, "--output-format=json",
881                                        "--output=%s" % output_filename], True)
882                self.log_print(output)
883        else:
884            output_filename = job_name + "_" + self.name + ".json"
885            output = self.exec_cmd(["sudo", self.fio_bin,
886                                    fio_config_file, "--output-format=json",
887                                    "--output" % output_filename], True)
888            self.log_print(output)
889        self.log_print("FIO run finished. Results in: %s" % output_filename)
890
891    def sys_config(self):
892        self.log_print("====Kernel release:====")
893        self.log_print(self.exec_cmd(["uname", "-r"]))
894        self.log_print("====Kernel command line:====")
895        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
896        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
897        self.log_print("====sysctl conf:====")
898        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
899        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
900        self.log_print("====Cpu power info:====")
901        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
902
903
904class KernelTarget(Target):
905    def __init__(self, name, general_config, target_config):
906        super(KernelTarget, self).__init__(name, general_config, target_config)
907        # Defaults
908        self.nvmet_bin = "nvmetcli"
909
910        if "nvmet_bin" in target_config:
911            self.nvmet_bin = target_config["nvmet_bin"]
912
913    def __del__(self):
914        nvmet_command(self.nvmet_bin, "clear")
915
916    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
917
918        nvmet_cfg = {
919            "ports": [],
920            "hosts": [],
921            "subsystems": [],
922        }
923
924        # Split disks between NIC IP's
925        disks_per_ip = int(len(nvme_list) / len(address_list))
926        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
927
928        subsys_no = 1
929        port_no = 0
930        for ip, chunk in zip(address_list, disk_chunks):
931            for disk in chunk:
932                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
933                nvmet_cfg["subsystems"].append({
934                    "allowed_hosts": [],
935                    "attr": {
936                        "allow_any_host": "1",
937                        "serial": "SPDK00%s" % subsys_no,
938                        "version": "1.3"
939                    },
940                    "namespaces": [
941                        {
942                            "device": {
943                                "path": disk,
944                                "uuid": "%s" % uuid.uuid4()
945                            },
946                            "enable": 1,
947                            "nsid": subsys_no
948                        }
949                    ],
950                    "nqn": nqn
951                })
952
953                nvmet_cfg["ports"].append({
954                    "addr": {
955                        "adrfam": "ipv4",
956                        "traddr": ip,
957                        "trsvcid": "%s" % (4420 + port_no),
958                        "trtype": "%s" % self.transport
959                    },
960                    "portid": subsys_no,
961                    "referrals": [],
962                    "subsystems": [nqn]
963                })
964                subsys_no += 1
965                port_no += 1
966                self.subsystem_info_list.append([port_no, nqn, ip])
967
968        with open("kernel.conf", "w") as fh:
969            fh.write(json.dumps(nvmet_cfg, indent=2))
970        pass
971
972    def tgt_start(self):
973        self.log_print("Configuring kernel NVMeOF Target")
974
975        if self.null_block:
976            print("Configuring with null block device.")
977            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
978            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
979            self.subsys_no = len(null_blk_list)
980        else:
981            print("Configuring with NVMe drives.")
982            nvme_list = get_nvme_devices()
983            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
984            self.subsys_no = len(nvme_list)
985
986        nvmet_command(self.nvmet_bin, "clear")
987        nvmet_command(self.nvmet_bin, "restore kernel.conf")
988
989        if self.enable_adq:
990            self.adq_configure_tc()
991
992        self.log_print("Done configuring kernel NVMeOF Target")
993
994
995class SPDKTarget(Target):
996    def __init__(self, name, general_config, target_config):
997        super(SPDKTarget, self).__init__(name, general_config, target_config)
998
999        # Required fields
1000        self.core_mask = target_config["core_mask"]
1001        self.num_cores = self.get_num_cores(self.core_mask)
1002
1003        # Defaults
1004        self.dif_insert_strip = False
1005        self.null_block_dif_type = 0
1006        self.num_shared_buffers = 4096
1007
1008        if "num_shared_buffers" in target_config:
1009            self.num_shared_buffers = target_config["num_shared_buffers"]
1010        if "null_block_dif_type" in target_config:
1011            self.null_block_dif_type = target_config["null_block_dif_type"]
1012        if "dif_insert_strip" in target_config:
1013            self.dif_insert_strip = target_config["dif_insert_strip"]
1014
1015    def get_num_cores(self, core_mask):
1016        if "0x" in core_mask:
1017            return bin(int(core_mask, 16)).count("1")
1018        else:
1019            num_cores = 0
1020            core_mask = core_mask.replace("[", "")
1021            core_mask = core_mask.replace("]", "")
1022            for i in core_mask.split(","):
1023                if "-" in i:
1024                    x, y = i.split("-")
1025                    num_cores += len(range(int(x), int(y))) + 1
1026                else:
1027                    num_cores += 1
1028            return num_cores
1029
1030    def spdk_tgt_configure(self):
1031        self.log_print("Configuring SPDK NVMeOF target via RPC")
1032
1033        # Create RDMA transport layer
1034        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1035                                       num_shared_buffers=self.num_shared_buffers,
1036                                       dif_insert_or_strip=self.dif_insert_strip,
1037                                       sock_priority=self.adq_priority)
1038        self.log_print("SPDK NVMeOF transport layer:")
1039        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1040
1041        if self.null_block:
1042            self.spdk_tgt_add_nullblock(self.null_block)
1043            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1044        else:
1045            self.spdk_tgt_add_nvme_conf()
1046            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1047
1048        if self.enable_adq:
1049            self.adq_configure_tc()
1050        self.log_print("Done configuring SPDK NVMeOF Target")
1051
1052    def spdk_tgt_add_nullblock(self, null_block_count):
1053        md_size = 0
1054        block_size = 4096
1055        if self.null_block_dif_type != 0:
1056            md_size = 128
1057
1058        self.log_print("Adding null block bdevices to config via RPC")
1059        for i in range(null_block_count):
1060            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1061            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1062                                      dif_type=self.null_block_dif_type, md_size=md_size)
1063        self.log_print("SPDK Bdevs configuration:")
1064        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1065
1066    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1067        self.log_print("Adding NVMe bdevs to config via RPC")
1068
1069        bdfs = get_nvme_devices_bdf()
1070        bdfs = [b.replace(":", ".") for b in bdfs]
1071
1072        if req_num_disks:
1073            if req_num_disks > len(bdfs):
1074                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1075                sys.exit(1)
1076            else:
1077                bdfs = bdfs[0:req_num_disks]
1078
1079        for i, bdf in enumerate(bdfs):
1080            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1081
1082        self.log_print("SPDK Bdevs configuration:")
1083        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1084
1085    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1086        self.log_print("Adding subsystems to config")
1087        port = "4420"
1088        if not req_num_disks:
1089            req_num_disks = get_nvme_devices_count()
1090
1091        # Distribute bdevs between provided NICs
1092        num_disks = range(0, req_num_disks)
1093        if len(num_disks) == 1:
1094            disks_per_ip = 1
1095        else:
1096            disks_per_ip = int(len(num_disks) / len(ips))
1097        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1098
1099        # Create subsystems, add bdevs to namespaces, add listeners
1100        for ip, chunk in zip(ips, disk_chunks):
1101            for c in chunk:
1102                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1103                serial = "SPDK00%s" % c
1104                bdev_name = "Nvme%sn1" % c
1105                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1106                                               allow_any_host=True, max_namespaces=8)
1107                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1108
1109                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1110                                                     nqn=nqn,
1111                                                     trtype=self.transport,
1112                                                     traddr=ip,
1113                                                     trsvcid=port,
1114                                                     adrfam="ipv4")
1115
1116                self.subsystem_info_list.append([port, nqn, ip])
1117        self.log_print("SPDK NVMeOF subsystem configuration:")
1118        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1119
1120    def tgt_start(self):
1121        if self.null_block:
1122            self.subsys_no = 1
1123        else:
1124            self.subsys_no = get_nvme_devices_count()
1125        self.log_print("Starting SPDK NVMeOF Target process")
1126        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1127        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1128        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1129
1130        with open(self.pid, "w") as fh:
1131            fh.write(str(proc.pid))
1132        self.nvmf_proc = proc
1133        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1134        self.log_print("Waiting for spdk to initilize...")
1135        while True:
1136            if os.path.exists("/var/tmp/spdk.sock"):
1137                break
1138            time.sleep(1)
1139        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1140
1141        if self.enable_zcopy:
1142            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1143                                           enable_zerocopy_send=True)
1144            self.log_print("Target socket options:")
1145            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1146
1147        if self.enable_adq:
1148            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1149            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1150                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1151            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1152
1153        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1154
1155        rpc.framework_start_init(self.client)
1156        self.spdk_tgt_configure()
1157
1158    def __del__(self):
1159        if hasattr(self, "nvmf_proc"):
1160            try:
1161                self.nvmf_proc.terminate()
1162                self.nvmf_proc.wait()
1163            except Exception as e:
1164                self.log_print(e)
1165                self.nvmf_proc.kill()
1166                self.nvmf_proc.communicate()
1167
1168
1169class KernelInitiator(Initiator):
1170    def __init__(self, name, general_config, initiator_config):
1171        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1172
1173        # Defaults
1174        self.extra_params = ""
1175
1176        if "extra_params" in initiator_config:
1177            self.extra_params = initiator_config["extra_params"]
1178
1179    def __del__(self):
1180        self.ssh_connection.close()
1181
1182    def kernel_init_connect(self):
1183        self.log_print("Below connection attempts may result in error messages, this is expected!")
1184        for subsystem in self.subsystem_info_list:
1185            self.log_print("Trying to connect %s %s %s" % subsystem)
1186            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1187                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1188            time.sleep(2)
1189
1190    def kernel_init_disconnect(self):
1191        for subsystem in self.subsystem_info_list:
1192            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1193            time.sleep(1)
1194
1195    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1196        out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
1197                             "|", "awk", "'{print $1}'"])
1198        nvme_list = [x for x in out.split("\n") if "nvme" in x]
1199
1200        filename_section = ""
1201        nvme_per_split = int(len(nvme_list) / len(threads))
1202        remainder = len(nvme_list) % len(threads)
1203        iterator = iter(nvme_list)
1204        result = []
1205        for i in range(len(threads)):
1206            result.append([])
1207            for _ in range(nvme_per_split):
1208                result[i].append(next(iterator))
1209                if remainder:
1210                    result[i].append(next(iterator))
1211                    remainder -= 1
1212        for i, r in enumerate(result):
1213            header = "[filename%s]" % i
1214            disks = "\n".join(["filename=%s" % x for x in r])
1215            job_section_qd = round((io_depth * len(r)) / num_jobs)
1216            if job_section_qd == 0:
1217                job_section_qd = 1
1218            iodepth = "iodepth=%s" % job_section_qd
1219            filename_section = "\n".join([filename_section, header, disks, iodepth])
1220
1221        return filename_section
1222
1223
1224class SPDKInitiator(Initiator):
1225    def __init__(self, name, general_config, initiator_config):
1226        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1227
1228        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1229            self.install_spdk()
1230
1231        # Required fields
1232        self.num_cores = initiator_config["num_cores"]
1233
1234    def install_spdk(self):
1235        self.log_print("Using fio binary %s" % self.fio_bin)
1236        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1237        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1238        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1239        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1240        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1241
1242        self.log_print("SPDK built")
1243        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1244
1245    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1246        bdev_cfg_section = {
1247            "subsystems": [
1248                {
1249                    "subsystem": "bdev",
1250                    "config": []
1251                }
1252            ]
1253        }
1254
1255        for i, subsys in enumerate(remote_subsystem_list):
1256            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1257            nvme_ctrl = {
1258                "method": "bdev_nvme_attach_controller",
1259                "params": {
1260                    "name": "Nvme{}".format(i),
1261                    "trtype": self.transport,
1262                    "traddr": sub_addr,
1263                    "trsvcid": sub_port,
1264                    "subnqn": sub_nqn,
1265                    "adrfam": "IPv4"
1266                }
1267            }
1268
1269            if self.enable_adq:
1270                nvme_ctrl["params"].update({"priority": "1"})
1271
1272            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1273
1274        return json.dumps(bdev_cfg_section, indent=2)
1275
1276    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1277        filename_section = ""
1278        if len(threads) >= len(subsystems):
1279            threads = range(0, len(subsystems))
1280        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1281        nvme_per_split = int(len(subsystems) / len(threads))
1282        remainder = len(subsystems) % len(threads)
1283        iterator = iter(filenames)
1284        result = []
1285        for i in range(len(threads)):
1286            result.append([])
1287            for _ in range(nvme_per_split):
1288                result[i].append(next(iterator))
1289            if remainder:
1290                result[i].append(next(iterator))
1291                remainder -= 1
1292        for i, r in enumerate(result):
1293            header = "[filename%s]" % i
1294            disks = "\n".join(["filename=%s" % x for x in r])
1295            job_section_qd = round((io_depth * len(r)) / num_jobs)
1296            if job_section_qd == 0:
1297                job_section_qd = 1
1298            iodepth = "iodepth=%s" % job_section_qd
1299            filename_section = "\n".join([filename_section, header, disks, iodepth])
1300
1301        return filename_section
1302
1303
1304if __name__ == "__main__":
1305    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1306    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1307
1308    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1309    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1310                        help='Configuration file.')
1311    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1312                        help='Results directory.')
1313    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1314                        help='CSV results filename.')
1315
1316    args = parser.parse_args()
1317
1318    print("Using config file: %s" % args.config)
1319    with open(args.config, "r") as config:
1320        data = json.load(config)
1321
1322    initiators = []
1323    fio_cases = []
1324
1325    general_config = data["general"]
1326    target_config = data["target"]
1327    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1328
1329    for k, v in data.items():
1330        if "target" in k:
1331            if data[k]["mode"] == "spdk":
1332                target_obj = SPDKTarget(k, data["general"], v)
1333            elif data[k]["mode"] == "kernel":
1334                target_obj = KernelTarget(k, data["general"], v)
1335                pass
1336        elif "initiator" in k:
1337            if data[k]["mode"] == "spdk":
1338                init_obj = SPDKInitiator(k, data["general"], v)
1339            elif data[k]["mode"] == "kernel":
1340                init_obj = KernelInitiator(k, data["general"], v)
1341            initiators.append(init_obj)
1342        elif "fio" in k:
1343            fio_workloads = itertools.product(data[k]["bs"],
1344                                              data[k]["qd"],
1345                                              data[k]["rw"])
1346
1347            fio_run_time = data[k]["run_time"]
1348            fio_ramp_time = data[k]["ramp_time"]
1349            fio_rw_mix_read = data[k]["rwmixread"]
1350            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1351            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1352
1353            fio_rate_iops = 0
1354            if "rate_iops" in data[k]:
1355                fio_rate_iops = data[k]["rate_iops"]
1356        else:
1357            continue
1358
1359    target_obj.tgt_start()
1360
1361    try:
1362        os.mkdir(args.results)
1363    except FileExistsError:
1364        pass
1365
1366    for i in initiators:
1367        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1368        if i.enable_adq:
1369            i.adq_configure_tc()
1370
1371    # Poor mans threading
1372    # Run FIO tests
1373    for block_size, io_depth, rw in fio_workloads:
1374        threads = []
1375        configs = []
1376        for i in initiators:
1377            if i.mode == "kernel":
1378                i.kernel_init_connect()
1379
1380            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1381                                   fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1382            configs.append(cfg)
1383
1384        for i, cfg in zip(initiators, configs):
1385            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1386            threads.append(t)
1387        if target_obj.enable_sar:
1388            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1389            sar_file_name = ".".join([sar_file_name, "txt"])
1390            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1391            threads.append(t)
1392
1393        if target_obj.enable_pcm:
1394            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1395
1396            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1397            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1398            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1399
1400            threads.append(pcm_cpu_t)
1401            threads.append(pcm_mem_t)
1402            threads.append(pcm_pow_t)
1403
1404        if target_obj.enable_bandwidth:
1405            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1406            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1407            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1408            threads.append(t)
1409
1410        if target_obj.enable_dpdk_memory:
1411            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1412            threads.append(t)
1413
1414        for t in threads:
1415            t.start()
1416        for t in threads:
1417            t.join()
1418
1419        for i in initiators:
1420            if i.mode == "kernel":
1421                i.kernel_init_disconnect()
1422            i.copy_result_files(args.results)
1423
1424    target_obj.restore_governor()
1425    target_obj.restore_tuned()
1426    target_obj.restore_services()
1427    target_obj.restore_sysctl()
1428    for i in initiators:
1429        i.restore_governor()
1430        i.restore_tuned()
1431        i.restore_services()
1432        i.restore_sysctl()
1433    target_obj.parse_results(args.results, args.csv_filename)
1434