xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 0ecbe09bc18245c46ebf6a3aae64ce64ea26c067)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20
21import rpc
22import rpc.client
23from common import *
24
25
26class Server:
27    def __init__(self, name, general_config, server_config):
28        self.name = name
29        self.username = general_config["username"]
30        self.password = general_config["password"]
31        self.transport = general_config["transport"].lower()
32        self.nic_ips = server_config["nic_ips"]
33        self.mode = server_config["mode"]
34
35        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
36        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
37            self.irq_scripts_dir = server_config["irq_scripts_dir"]
38
39        self.local_nic_info = []
40        self._nics_json_obj = {}
41        self.svc_restore_dict = {}
42        self.sysctl_restore_dict = {}
43        self.tuned_restore_dict = {}
44        self.governor_restore = ""
45        self.tuned_profile = ""
46
47        self.enable_adq = False
48        self.adq_priority = None
49        if "adq_enable" in server_config and server_config["adq_enable"]:
50            self.enable_adq = server_config["adq_enable"]
51            self.adq_priority = 1
52
53        if "tuned_profile" in server_config:
54            self.tuned_profile = server_config["tuned_profile"]
55
56        if not re.match("^[A-Za-z0-9]*$", name):
57            self.log_print("Please use a name which contains only letters or numbers")
58            sys.exit(1)
59
60    def log_print(self, msg):
61        print("[%s] %s" % (self.name, msg), flush=True)
62
63    def get_uncommented_lines(self, lines):
64        return [line for line in lines if line and not line.startswith('#')]
65
66    def get_nic_name_by_ip(self, ip):
67        if not self._nics_json_obj:
68            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
69            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
70        for nic in self._nics_json_obj:
71            for addr in nic["addr_info"]:
72                if ip in addr["local"]:
73                    return nic["ifname"]
74
75    def set_local_nic_info_helper(self):
76        pass
77
78    def set_local_nic_info(self, pci_info):
79        def extract_network_elements(json_obj):
80            nic_list = []
81            if isinstance(json_obj, list):
82                for x in json_obj:
83                    nic_list.extend(extract_network_elements(x))
84            elif isinstance(json_obj, dict):
85                if "children" in json_obj:
86                    nic_list.extend(extract_network_elements(json_obj["children"]))
87                if "class" in json_obj.keys() and "network" in json_obj["class"]:
88                    nic_list.append(json_obj)
89            return nic_list
90
91        self.local_nic_info = extract_network_elements(pci_info)
92
93    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
94        return ""
95
96    def configure_system(self):
97        self.configure_services()
98        self.configure_sysctl()
99        self.configure_tuned()
100        self.configure_cpu_governor()
101        self.configure_irq_affinity()
102
103    def configure_adq(self):
104        if self.mode == "kernel":
105            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
106            return
107        self.adq_load_modules()
108        self.adq_configure_nic()
109
110    def adq_load_modules(self):
111        self.log_print("Modprobing ADQ-related Linux modules...")
112        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
113        for module in adq_module_deps:
114            try:
115                self.exec_cmd(["sudo", "modprobe", module])
116                self.log_print("%s loaded!" % module)
117            except CalledProcessError as e:
118                self.log_print("ERROR: failed to load module %s" % module)
119                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
120
121    def adq_configure_tc(self):
122        self.log_print("Configuring ADQ Traffic classess and filters...")
123
124        if self.mode == "kernel":
125            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
126            return
127
128        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
129        num_queues_tc1 = self.num_cores
130        port_param = "dst_port" if isinstance(self, Target) else "src_port"
131        port = "4420"
132        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
133
134        for nic_ip in self.nic_ips:
135            nic_name = self.get_nic_name_by_ip(nic_ip)
136            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
137                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
138                                "queues", "%s@0" % num_queues_tc0,
139                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
140                                "hw", "1", "mode", "channel"]
141            self.log_print(" ".join(tc_qdisc_map_cmd))
142            self.exec_cmd(tc_qdisc_map_cmd)
143
144            time.sleep(5)
145            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
146            self.log_print(" ".join(tc_qdisc_ingress_cmd))
147            self.exec_cmd(tc_qdisc_ingress_cmd)
148
149            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
150                             "protocol", "ip", "ingress", "prio", "1", "flower",
151                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
152                             "skip_sw", "hw_tc", "1"]
153            self.log_print(" ".join(tc_filter_cmd))
154            self.exec_cmd(tc_filter_cmd)
155
156            # show tc configuration
157            self.log_print("Show tc configuration for %s NIC..." % nic_name)
158            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
159            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
160            self.log_print("%s" % tc_disk_out)
161            self.log_print("%s" % tc_filter_out)
162
163            # Ethtool coalese settings must be applied after configuring traffic classes
164            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
165            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
166
167            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
168            xps_cmd = ["sudo", xps_script_path, nic_name]
169            self.log_print(xps_cmd)
170            self.exec_cmd(xps_cmd)
171
172    def adq_configure_nic(self):
173        self.log_print("Configuring NIC port settings for ADQ testing...")
174
175        # Reload the driver first, to make sure any previous settings are re-set.
176        try:
177            self.exec_cmd(["sudo", "rmmod", "ice"])
178            self.exec_cmd(["sudo", "modprobe", "ice"])
179        except CalledProcessError as e:
180            self.log_print("ERROR: failed to reload ice module!")
181            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
182
183        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
184        for nic in nic_names:
185            self.log_print(nic)
186            try:
187                self.exec_cmd(["sudo", "ethtool", "-K", nic,
188                               "hw-tc-offload", "on"])  # Enable hardware TC offload
189                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
190                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
191                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
192                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
193                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
194                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
195                               "channel-pkt-inspect-optimize", "on"])
196            except CalledProcessError as e:
197                self.log_print("ERROR: failed to configure NIC port using ethtool!")
198                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
199                self.log_print("Please update your NIC driver and firmware versions and try again.")
200            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
201            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
202
203    def configure_services(self):
204        self.log_print("Configuring active services...")
205        svc_config = configparser.ConfigParser(strict=False)
206
207        # Below list is valid only for RHEL / Fedora systems and might not
208        # contain valid names for other distributions.
209        svc_target_state = {
210            "firewalld": "inactive",
211            "irqbalance": "inactive",
212            "lldpad.service": "inactive",
213            "lldpad.socket": "inactive"
214        }
215
216        for service in svc_target_state:
217            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
218            out = "\n".join(["[%s]" % service, out])
219            svc_config.read_string(out)
220
221            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
222                continue
223
224            service_state = svc_config[service]["ActiveState"]
225            self.log_print("Current state of %s service is %s" % (service, service_state))
226            self.svc_restore_dict.update({service: service_state})
227            if service_state != "inactive":
228                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
229                self.exec_cmd(["sudo", "systemctl", "stop", service])
230
231    def configure_sysctl(self):
232        self.log_print("Tuning sysctl settings...")
233
234        busy_read = 0
235        if self.enable_adq and self.mode == "spdk":
236            busy_read = 1
237
238        sysctl_opts = {
239            "net.core.busy_poll": 0,
240            "net.core.busy_read": busy_read,
241            "net.core.somaxconn": 4096,
242            "net.core.netdev_max_backlog": 8192,
243            "net.ipv4.tcp_max_syn_backlog": 16384,
244            "net.core.rmem_max": 268435456,
245            "net.core.wmem_max": 268435456,
246            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
247            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
248            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
249            "net.ipv4.route.flush": 1,
250            "vm.overcommit_memory": 1,
251        }
252
253        for opt, value in sysctl_opts.items():
254            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
255            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
256
257    def configure_tuned(self):
258        if not self.tuned_profile:
259            self.log_print("WARNING: Tuned profile not set in configration file. Skipping configuration.")
260            return
261
262        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
263        service = "tuned"
264        tuned_config = configparser.ConfigParser(strict=False)
265
266        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
267        out = "\n".join(["[%s]" % service, out])
268        tuned_config.read_string(out)
269        tuned_state = tuned_config[service]["ActiveState"]
270        self.svc_restore_dict.update({service: tuned_state})
271
272        if tuned_state != "inactive":
273            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
274            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
275
276            self.tuned_restore_dict = {
277                "profile": profile,
278                "mode": profile_mode
279            }
280
281        self.exec_cmd(["sudo", "systemctl", "start", service])
282        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
283        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
284
285    def configure_cpu_governor(self):
286        self.log_print("Setting CPU governor to performance...")
287
288        # This assumes that there is the same CPU scaling governor on each CPU
289        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
290        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
291
292    def configure_irq_affinity(self):
293        self.log_print("Setting NIC irq affinity for NICs...")
294
295        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
296        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
297        for nic in nic_names:
298            irq_cmd = ["sudo", irq_script_path, nic]
299            self.log_print(irq_cmd)
300            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
301
302    def restore_services(self):
303        self.log_print("Restoring services...")
304        for service, state in self.svc_restore_dict.items():
305            cmd = "stop" if state == "inactive" else "start"
306            self.exec_cmd(["sudo", "systemctl", cmd, service])
307
308    def restore_sysctl(self):
309        self.log_print("Restoring sysctl settings...")
310        for opt, value in self.sysctl_restore_dict.items():
311            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
312
313    def restore_tuned(self):
314        self.log_print("Restoring tuned-adm settings...")
315
316        if not self.tuned_restore_dict:
317            return
318
319        if self.tuned_restore_dict["mode"] == "auto":
320            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
321            self.log_print("Reverted tuned-adm to auto_profile.")
322        else:
323            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
324            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
325
326    def restore_governor(self):
327        self.log_print("Restoring CPU governor setting...")
328        if self.governor_restore:
329            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
330            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
331
332
333class Target(Server):
334    def __init__(self, name, general_config, target_config):
335        super(Target, self).__init__(name, general_config, target_config)
336
337        # Defaults
338        self.enable_sar = False
339        self.sar_delay = 0
340        self.sar_interval = 0
341        self.sar_count = 0
342        self.enable_pcm = False
343        self.pcm_dir = ""
344        self.pcm_delay = 0
345        self.pcm_interval = 0
346        self.pcm_count = 0
347        self.enable_bandwidth = 0
348        self.bandwidth_count = 0
349        self.enable_dpdk_memory = False
350        self.dpdk_wait_time = 0
351        self.enable_zcopy = False
352        self.scheduler_name = "static"
353        self.null_block = 0
354        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
355        self.subsystem_info_list = []
356
357        if "null_block_devices" in target_config:
358            self.null_block = target_config["null_block_devices"]
359        if "sar_settings" in target_config:
360            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
361        if "pcm_settings" in target_config:
362            self.enable_pcm = True
363            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
364        if "enable_bandwidth" in target_config:
365            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
366        if "enable_dpdk_memory" in target_config:
367            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
368        if "scheduler_settings" in target_config:
369            self.scheduler_name = target_config["scheduler_settings"]
370        if "zcopy_settings" in target_config:
371            self.enable_zcopy = target_config["zcopy_settings"]
372        if "results_dir" in target_config:
373            self.results_dir = target_config["results_dir"]
374
375        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
376        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
377        self.set_local_nic_info(self.set_local_nic_info_helper())
378
379        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
380            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
381
382        self.configure_system()
383        if self.enable_adq:
384            self.configure_adq()
385        self.sys_config()
386
387    def set_local_nic_info_helper(self):
388        return json.loads(self.exec_cmd(["lshw", "-json"]))
389
390    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
391        stderr_opt = None
392        if stderr_redirect:
393            stderr_opt = subprocess.STDOUT
394        if change_dir:
395            old_cwd = os.getcwd()
396            os.chdir(change_dir)
397            self.log_print("Changing directory to %s" % change_dir)
398
399        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
400
401        if change_dir:
402            os.chdir(old_cwd)
403            self.log_print("Changing directory to %s" % old_cwd)
404        return out
405
406    def zip_spdk_sources(self, spdk_dir, dest_file):
407        self.log_print("Zipping SPDK source directory")
408        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
409        for root, directories, files in os.walk(spdk_dir, followlinks=True):
410            for file in files:
411                fh.write(os.path.relpath(os.path.join(root, file)))
412        fh.close()
413        self.log_print("Done zipping")
414
415    def read_json_stats(self, file):
416        with open(file, "r") as json_data:
417            data = json.load(json_data)
418            job_pos = 0  # job_post = 0 because using aggregated results
419
420            # Check if latency is in nano or microseconds to choose correct dict key
421            def get_lat_unit(key_prefix, dict_section):
422                # key prefix - lat, clat or slat.
423                # dict section - portion of json containing latency bucket in question
424                # Return dict key to access the bucket and unit as string
425                for k, _ in dict_section.items():
426                    if k.startswith(key_prefix):
427                        return k, k.split("_")[1]
428
429            def get_clat_percentiles(clat_dict_leaf):
430                if "percentile" in clat_dict_leaf:
431                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
432                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
433                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
434                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
435
436                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
437                else:
438                    # Latest fio versions do not provide "percentile" results if no
439                    # measurements were done, so just return zeroes
440                    return [0, 0, 0, 0]
441
442            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
443            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
444            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
445            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
446            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
447            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
448            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
449            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
450                data["jobs"][job_pos]["read"][clat_key])
451
452            if "ns" in lat_unit:
453                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
454            if "ns" in clat_unit:
455                read_p99_lat = read_p99_lat / 1000
456                read_p99_9_lat = read_p99_9_lat / 1000
457                read_p99_99_lat = read_p99_99_lat / 1000
458                read_p99_999_lat = read_p99_999_lat / 1000
459
460            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
461            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
462            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
463            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
464            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
465            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
466            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
467            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
468                data["jobs"][job_pos]["write"][clat_key])
469
470            if "ns" in lat_unit:
471                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
472            if "ns" in clat_unit:
473                write_p99_lat = write_p99_lat / 1000
474                write_p99_9_lat = write_p99_9_lat / 1000
475                write_p99_99_lat = write_p99_99_lat / 1000
476                write_p99_999_lat = write_p99_999_lat / 1000
477
478        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
479                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
480                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
481                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
482
483    def parse_results(self, results_dir, csv_file):
484        files = os.listdir(results_dir)
485        fio_files = filter(lambda x: ".fio" in x, files)
486        json_files = [x for x in files if ".json" in x]
487
488        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
489                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
490                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
491                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
492
493        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
494                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
495
496        header_line = ",".join(["Name", *headers])
497        aggr_header_line = ",".join(["Name", *aggr_headers])
498
499        # Create empty results file
500        with open(os.path.join(results_dir, csv_file), "w") as fh:
501            fh.write(aggr_header_line + "\n")
502        rows = set()
503
504        for fio_config in fio_files:
505            self.log_print("Getting FIO stats for %s" % fio_config)
506            job_name, _ = os.path.splitext(fio_config)
507
508            # Look in the filename for rwmixread value. Function arguments do
509            # not have that information.
510            # TODO: Improve this function by directly using workload params instead
511            # of regexing through filenames.
512            if "read" in job_name:
513                rw_mixread = 1
514            elif "write" in job_name:
515                rw_mixread = 0
516            else:
517                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
518
519            # If "_CPU" exists in name - ignore it
520            # Initiators for the same job could have diffrent num_cores parameter
521            job_name = re.sub(r"_\d+CPU", "", job_name)
522            job_result_files = [x for x in json_files if job_name in x]
523            self.log_print("Matching result files for current fio config:")
524            for j in job_result_files:
525                self.log_print("\t %s" % j)
526
527            # There may have been more than 1 initiator used in test, need to check that
528            # Result files are created so that string after last "_" separator is server name
529            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
530            inits_avg_results = []
531            for i in inits_names:
532                self.log_print("\tGetting stats for initiator %s" % i)
533                # There may have been more than 1 test run for this job, calculate average results for initiator
534                i_results = [x for x in job_result_files if i in x]
535                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
536
537                separate_stats = []
538                for r in i_results:
539                    try:
540                        stats = self.read_json_stats(os.path.join(results_dir, r))
541                        separate_stats.append(stats)
542                        self.log_print(stats)
543                    except JSONDecodeError as e:
544                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!")
545
546                init_results = [sum(x) for x in zip(*separate_stats)]
547                init_results = [x / len(separate_stats) for x in init_results]
548                inits_avg_results.append(init_results)
549
550                self.log_print("\tAverage results for initiator %s" % i)
551                self.log_print(init_results)
552                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
553                    fh.write(header_line + "\n")
554                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
555
556            # Sum results of all initiators running this FIO job.
557            # Latency results are an average of latencies from accros all initiators.
558            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
559            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
560            for key in inits_avg_results:
561                if "lat" in key:
562                    inits_avg_results[key] /= len(inits_names)
563
564            # Aggregate separate read/write values into common labels
565            # Take rw_mixread into consideration for mixed read/write workloads.
566            aggregate_results = OrderedDict()
567            for h in aggr_headers:
568                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
569                if "lat" in h:
570                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
571                else:
572                    _ = read_stat + write_stat
573                aggregate_results[h] = "{0:.3f}".format(_)
574
575            rows.add(",".join([job_name, *aggregate_results.values()]))
576
577        # Save results to file
578        for row in rows:
579            with open(os.path.join(results_dir, csv_file), "a") as fh:
580                fh.write(row + "\n")
581        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
582
583    def measure_sar(self, results_dir, sar_file_name):
584        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
585        cpu_number = os.cpu_count()
586        sar_idle_sum = 0
587        time.sleep(self.sar_delay)
588        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
589        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
590            for line in out.split("\n"):
591                if "Average" in line:
592                    if "CPU" in line:
593                        self.log_print("Summary CPU utilization from SAR:")
594                        self.log_print(line)
595                    elif "all" in line:
596                        self.log_print(line)
597                    else:
598                        sar_idle_sum += float(line.split()[7])
599            fh.write(out)
600        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
601        with open(os.path.join(results_dir, sar_file_name), "a") as f:
602            f.write("Total CPU used: " + str(sar_cpu_usage))
603
604    def ethtool_after_fio_ramp(self, fio_ramp_time):
605        time.sleep(fio_ramp_time//2)
606        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
607        for nic in nic_names:
608            self.log_print(nic)
609            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
610                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
611
612    def measure_pcm_memory(self, results_dir, pcm_file_name):
613        time.sleep(self.pcm_delay)
614        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
615        pcm_memory = subprocess.Popen(cmd)
616        time.sleep(self.pcm_count)
617        pcm_memory.terminate()
618
619    def measure_pcm(self, results_dir, pcm_file_name):
620        time.sleep(self.pcm_delay)
621        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
622        subprocess.run(cmd)
623        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
624        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
625        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
626        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
627        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
628
629    def measure_pcm_power(self, results_dir, pcm_power_file_name):
630        time.sleep(self.pcm_delay)
631        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
632        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
633            fh.write(out)
634
635    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
636        self.log_print("INFO: starting network bandwidth measure")
637        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
638                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
639
640    def measure_dpdk_memory(self, results_dir):
641        self.log_print("INFO: waiting to generate DPDK memory usage")
642        time.sleep(self.dpdk_wait_time)
643        self.log_print("INFO: generating DPDK memory usage")
644        rpc.env.env_dpdk_get_mem_stats
645        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
646
647    def sys_config(self):
648        self.log_print("====Kernel release:====")
649        self.log_print(os.uname().release)
650        self.log_print("====Kernel command line:====")
651        with open('/proc/cmdline') as f:
652            cmdline = f.readlines()
653            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
654        self.log_print("====sysctl conf:====")
655        with open('/etc/sysctl.conf') as f:
656            sysctl = f.readlines()
657            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
658        self.log_print("====Cpu power info:====")
659        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
660        self.log_print("====zcopy settings:====")
661        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
662        self.log_print("====Scheduler settings:====")
663        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
664
665
666class Initiator(Server):
667    def __init__(self, name, general_config, initiator_config):
668        super(Initiator, self).__init__(name, general_config, initiator_config)
669
670        # Required fields
671        self.ip = initiator_config["ip"]
672        self.target_nic_ips = initiator_config["target_nic_ips"]
673
674        # Defaults
675        self.cpus_allowed = None
676        self.cpus_allowed_policy = "shared"
677        self.spdk_dir = "/tmp/spdk"
678        self.fio_bin = "/usr/src/fio/fio"
679        self.nvmecli_bin = "nvme"
680        self.cpu_frequency = None
681        self.subsystem_info_list = []
682
683        if "spdk_dir" in initiator_config:
684            self.spdk_dir = initiator_config["spdk_dir"]
685        if "fio_bin" in initiator_config:
686            self.fio_bin = initiator_config["fio_bin"]
687        if "nvmecli_bin" in initiator_config:
688            self.nvmecli_bin = initiator_config["nvmecli_bin"]
689        if "cpus_allowed" in initiator_config:
690            self.cpus_allowed = initiator_config["cpus_allowed"]
691        if "cpus_allowed_policy" in initiator_config:
692            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
693        if "cpu_frequency" in initiator_config:
694            self.cpu_frequency = initiator_config["cpu_frequency"]
695        if os.getenv('SPDK_WORKSPACE'):
696            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
697
698        self.ssh_connection = paramiko.SSHClient()
699        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
700        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
701        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
702        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
703        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
704
705        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
706            self.copy_spdk("/tmp/spdk.zip")
707        self.set_local_nic_info(self.set_local_nic_info_helper())
708        self.set_cpu_frequency()
709        self.configure_system()
710        if self.enable_adq:
711            self.configure_adq()
712        self.sys_config()
713
714    def set_local_nic_info_helper(self):
715        return json.loads(self.exec_cmd(["lshw", "-json"]))
716
717    def __del__(self):
718        self.ssh_connection.close()
719
720    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
721        if change_dir:
722            cmd = ["cd", change_dir, ";", *cmd]
723
724        # In case one of the command elements contains whitespace and is not
725        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
726        # when sending to remote system.
727        for i, c in enumerate(cmd):
728            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
729                cmd[i] = '"%s"' % c
730        cmd = " ".join(cmd)
731
732        # Redirect stderr to stdout thanks using get_pty option if needed
733        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
734        out = stdout.read().decode(encoding="utf-8")
735
736        # Check the return code
737        rc = stdout.channel.recv_exit_status()
738        if rc:
739            raise CalledProcessError(int(rc), cmd, out)
740
741        return out
742
743    def put_file(self, local, remote_dest):
744        ftp = self.ssh_connection.open_sftp()
745        ftp.put(local, remote_dest)
746        ftp.close()
747
748    def get_file(self, remote, local_dest):
749        ftp = self.ssh_connection.open_sftp()
750        ftp.get(remote, local_dest)
751        ftp.close()
752
753    def copy_spdk(self, local_spdk_zip):
754        self.log_print("Copying SPDK sources to initiator %s" % self.name)
755        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
756        self.log_print("Copied sources zip from target")
757        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
758        self.log_print("Sources unpacked")
759
760    def copy_result_files(self, dest_dir):
761        self.log_print("Copying results")
762
763        if not os.path.exists(dest_dir):
764            os.mkdir(dest_dir)
765
766        # Get list of result files from initiator and copy them back to target
767        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
768
769        for file in file_list:
770            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
771                          os.path.join(dest_dir, file))
772        self.log_print("Done copying results")
773
774    def discover_subsystems(self, address_list, subsys_no):
775        num_nvmes = range(0, subsys_no)
776        nvme_discover_output = ""
777        for ip, subsys_no in itertools.product(address_list, num_nvmes):
778            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
779            nvme_discover_cmd = ["sudo",
780                                 "%s" % self.nvmecli_bin,
781                                 "discover", "-t", "%s" % self.transport,
782                                 "-s", "%s" % (4420 + subsys_no),
783                                 "-a", "%s" % ip]
784
785            try:
786                stdout = self.exec_cmd(nvme_discover_cmd)
787                if stdout:
788                    nvme_discover_output = nvme_discover_output + stdout
789            except CalledProcessError:
790                # Do nothing. In case of discovering remote subsystems of kernel target
791                # we expect "nvme discover" to fail a bunch of times because we basically
792                # scan ports.
793                pass
794
795        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
796                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
797                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
798                                nvme_discover_output)  # from nvme discovery output
799        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
800        subsystems = list(set(subsystems))
801        subsystems.sort(key=lambda x: x[1])
802        self.log_print("Found matching subsystems on target side:")
803        for s in subsystems:
804            self.log_print(s)
805        self.subsystem_info_list = subsystems
806
807    def gen_fio_filename_conf(self, *args, **kwargs):
808        # Logic implemented in SPDKInitiator and KernelInitiator classes
809        pass
810
811    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
812        fio_conf_template = """
813[global]
814ioengine={ioengine}
815{spdk_conf}
816thread=1
817group_reporting=1
818direct=1
819percentile_list=50:90:99:99.5:99.9:99.99:99.999
820
821norandommap=1
822rw={rw}
823rwmixread={rwmixread}
824bs={block_size}
825time_based=1
826ramp_time={ramp_time}
827runtime={run_time}
828rate_iops={rate_iops}
829"""
830        if "spdk" in self.mode:
831            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
832            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
833            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
834            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
835        else:
836            ioengine = self.ioengine
837            spdk_conf = ""
838            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
839                                 "|", "awk", "'{print $1}'"])
840            subsystems = [x for x in out.split("\n") if "nvme" in x]
841
842        if self.cpus_allowed is not None:
843            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
844            cpus_num = 0
845            cpus = self.cpus_allowed.split(",")
846            for cpu in cpus:
847                if "-" in cpu:
848                    a, b = cpu.split("-")
849                    a = int(a)
850                    b = int(b)
851                    cpus_num += len(range(a, b))
852                else:
853                    cpus_num += 1
854            self.num_cores = cpus_num
855            threads = range(0, self.num_cores)
856        elif hasattr(self, 'num_cores'):
857            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
858            threads = range(0, int(self.num_cores))
859        else:
860            self.num_cores = len(subsystems)
861            threads = range(0, len(subsystems))
862
863        if "spdk" in self.mode:
864            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
865        else:
866            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
867
868        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
869                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
870                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
871
872        # TODO: hipri disabled for now, as it causes fio errors:
873        # io_u error on file /dev/nvme2n1: Operation not supported
874        # See comment in KernelInitiator class, kernel_init_connect() function
875        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
876            fio_config = fio_config + """
877fixedbufs=1
878registerfiles=1
879#hipri=1
880"""
881        if num_jobs:
882            fio_config = fio_config + "numjobs=%s \n" % num_jobs
883        if self.cpus_allowed is not None:
884            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
885            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
886        fio_config = fio_config + filename_section
887
888        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
889        if hasattr(self, "num_cores"):
890            fio_config_filename += "_%sCPU" % self.num_cores
891        fio_config_filename += ".fio"
892
893        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
894        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
895        self.log_print("Created FIO Config:")
896        self.log_print(fio_config)
897
898        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
899
900    def set_cpu_frequency(self):
901        if self.cpu_frequency is not None:
902            try:
903                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
904                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
905                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
906            except Exception:
907                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
908                sys.exit()
909        else:
910            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
911
912    def run_fio(self, fio_config_file, run_num=None):
913        job_name, _ = os.path.splitext(fio_config_file)
914        self.log_print("Starting FIO run for job: %s" % job_name)
915        self.log_print("Using FIO: %s" % self.fio_bin)
916
917        if run_num:
918            for i in range(1, run_num + 1):
919                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
920                try:
921                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
922                                            "--output=%s" % output_filename, "--eta=never"], True)
923                    self.log_print(output)
924                except subprocess.CalledProcessError as e:
925                    self.log_print("ERROR: Fio process failed!")
926                    self.log_print(e.stdout)
927        else:
928            output_filename = job_name + "_" + self.name + ".json"
929            output = self.exec_cmd(["sudo", self.fio_bin,
930                                    fio_config_file, "--output-format=json",
931                                    "--output" % output_filename], True)
932            self.log_print(output)
933        self.log_print("FIO run finished. Results in: %s" % output_filename)
934
935    def sys_config(self):
936        self.log_print("====Kernel release:====")
937        self.log_print(self.exec_cmd(["uname", "-r"]))
938        self.log_print("====Kernel command line:====")
939        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
940        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
941        self.log_print("====sysctl conf:====")
942        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
943        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
944        self.log_print("====Cpu power info:====")
945        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
946
947
948class KernelTarget(Target):
949    def __init__(self, name, general_config, target_config):
950        super(KernelTarget, self).__init__(name, general_config, target_config)
951        # Defaults
952        self.nvmet_bin = "nvmetcli"
953
954        if "nvmet_bin" in target_config:
955            self.nvmet_bin = target_config["nvmet_bin"]
956
957    def __del__(self):
958        nvmet_command(self.nvmet_bin, "clear")
959
960    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
961
962        nvmet_cfg = {
963            "ports": [],
964            "hosts": [],
965            "subsystems": [],
966        }
967
968        # Split disks between NIC IP's
969        disks_per_ip = int(len(nvme_list) / len(address_list))
970        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
971
972        subsys_no = 1
973        port_no = 0
974        for ip, chunk in zip(address_list, disk_chunks):
975            for disk in chunk:
976                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
977                nvmet_cfg["subsystems"].append({
978                    "allowed_hosts": [],
979                    "attr": {
980                        "allow_any_host": "1",
981                        "serial": "SPDK00%s" % subsys_no,
982                        "version": "1.3"
983                    },
984                    "namespaces": [
985                        {
986                            "device": {
987                                "path": disk,
988                                "uuid": "%s" % uuid.uuid4()
989                            },
990                            "enable": 1,
991                            "nsid": subsys_no
992                        }
993                    ],
994                    "nqn": nqn
995                })
996
997                nvmet_cfg["ports"].append({
998                    "addr": {
999                        "adrfam": "ipv4",
1000                        "traddr": ip,
1001                        "trsvcid": "%s" % (4420 + port_no),
1002                        "trtype": "%s" % self.transport
1003                    },
1004                    "portid": subsys_no,
1005                    "referrals": [],
1006                    "subsystems": [nqn]
1007                })
1008                subsys_no += 1
1009                port_no += 1
1010                self.subsystem_info_list.append([port_no, nqn, ip])
1011
1012        with open("kernel.conf", "w") as fh:
1013            fh.write(json.dumps(nvmet_cfg, indent=2))
1014        pass
1015
1016    def tgt_start(self):
1017        self.log_print("Configuring kernel NVMeOF Target")
1018
1019        if self.null_block:
1020            print("Configuring with null block device.")
1021            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1022            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
1023            self.subsys_no = len(null_blk_list)
1024        else:
1025            print("Configuring with NVMe drives.")
1026            nvme_list = get_nvme_devices()
1027            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1028            self.subsys_no = len(nvme_list)
1029
1030        nvmet_command(self.nvmet_bin, "clear")
1031        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1032
1033        if self.enable_adq:
1034            self.adq_configure_tc()
1035
1036        self.log_print("Done configuring kernel NVMeOF Target")
1037
1038
1039class SPDKTarget(Target):
1040    def __init__(self, name, general_config, target_config):
1041        super(SPDKTarget, self).__init__(name, general_config, target_config)
1042
1043        # Required fields
1044        self.core_mask = target_config["core_mask"]
1045        self.num_cores = self.get_num_cores(self.core_mask)
1046
1047        # Defaults
1048        self.dif_insert_strip = False
1049        self.null_block_dif_type = 0
1050        self.num_shared_buffers = 4096
1051        self.bpf_proc = None
1052        self.bpf_scripts = []
1053
1054        if "num_shared_buffers" in target_config:
1055            self.num_shared_buffers = target_config["num_shared_buffers"]
1056        if "null_block_dif_type" in target_config:
1057            self.null_block_dif_type = target_config["null_block_dif_type"]
1058        if "dif_insert_strip" in target_config:
1059            self.dif_insert_strip = target_config["dif_insert_strip"]
1060        if "bpf_scripts" in target_config:
1061            self.bpf_scripts = target_config["bpf_scripts"]
1062
1063    def get_num_cores(self, core_mask):
1064        if "0x" in core_mask:
1065            return bin(int(core_mask, 16)).count("1")
1066        else:
1067            num_cores = 0
1068            core_mask = core_mask.replace("[", "")
1069            core_mask = core_mask.replace("]", "")
1070            for i in core_mask.split(","):
1071                if "-" in i:
1072                    x, y = i.split("-")
1073                    num_cores += len(range(int(x), int(y))) + 1
1074                else:
1075                    num_cores += 1
1076            return num_cores
1077
1078    def spdk_tgt_configure(self):
1079        self.log_print("Configuring SPDK NVMeOF target via RPC")
1080
1081        if self.enable_adq:
1082            self.adq_configure_tc()
1083
1084        # Create RDMA transport layer
1085        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1086                                       num_shared_buffers=self.num_shared_buffers,
1087                                       dif_insert_or_strip=self.dif_insert_strip,
1088                                       sock_priority=self.adq_priority)
1089        self.log_print("SPDK NVMeOF transport layer:")
1090        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1091
1092        if self.null_block:
1093            self.spdk_tgt_add_nullblock(self.null_block)
1094            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1095        else:
1096            self.spdk_tgt_add_nvme_conf()
1097            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1098
1099        self.log_print("Done configuring SPDK NVMeOF Target")
1100
1101    def spdk_tgt_add_nullblock(self, null_block_count):
1102        md_size = 0
1103        block_size = 4096
1104        if self.null_block_dif_type != 0:
1105            md_size = 128
1106
1107        self.log_print("Adding null block bdevices to config via RPC")
1108        for i in range(null_block_count):
1109            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1110            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1111                                      dif_type=self.null_block_dif_type, md_size=md_size)
1112        self.log_print("SPDK Bdevs configuration:")
1113        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1114
1115    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1116        self.log_print("Adding NVMe bdevs to config via RPC")
1117
1118        bdfs = get_nvme_devices_bdf()
1119        bdfs = [b.replace(":", ".") for b in bdfs]
1120
1121        if req_num_disks:
1122            if req_num_disks > len(bdfs):
1123                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1124                sys.exit(1)
1125            else:
1126                bdfs = bdfs[0:req_num_disks]
1127
1128        for i, bdf in enumerate(bdfs):
1129            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1130
1131        self.log_print("SPDK Bdevs configuration:")
1132        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1133
1134    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1135        self.log_print("Adding subsystems to config")
1136        port = "4420"
1137        if not req_num_disks:
1138            req_num_disks = get_nvme_devices_count()
1139
1140        # Distribute bdevs between provided NICs
1141        num_disks = range(0, req_num_disks)
1142        if len(num_disks) == 1:
1143            disks_per_ip = 1
1144        else:
1145            disks_per_ip = int(len(num_disks) / len(ips))
1146        disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
1147
1148        # Create subsystems, add bdevs to namespaces, add listeners
1149        for ip, chunk in zip(ips, disk_chunks):
1150            for c in chunk:
1151                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1152                serial = "SPDK00%s" % c
1153                bdev_name = "Nvme%sn1" % c
1154                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1155                                               allow_any_host=True, max_namespaces=8)
1156                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1157
1158                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1159                                                     nqn=nqn,
1160                                                     trtype=self.transport,
1161                                                     traddr=ip,
1162                                                     trsvcid=port,
1163                                                     adrfam="ipv4")
1164
1165                self.subsystem_info_list.append([port, nqn, ip])
1166        self.log_print("SPDK NVMeOF subsystem configuration:")
1167        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1168
1169    def bpf_start(self):
1170        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1171        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1172        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1173        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1174
1175        with open(self.pid, "r") as fh:
1176            nvmf_pid = str(fh.readline())
1177
1178        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1179        self.log_print(cmd)
1180        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1181
1182    def tgt_start(self):
1183        if self.null_block:
1184            self.subsys_no = 1
1185        else:
1186            self.subsys_no = get_nvme_devices_count()
1187        self.log_print("Starting SPDK NVMeOF Target process")
1188        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1189        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1190        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1191
1192        with open(self.pid, "w") as fh:
1193            fh.write(str(proc.pid))
1194        self.nvmf_proc = proc
1195        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1196        self.log_print("Waiting for spdk to initilize...")
1197        while True:
1198            if os.path.exists("/var/tmp/spdk.sock"):
1199                break
1200            time.sleep(1)
1201        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1202
1203        if self.enable_zcopy:
1204            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1205                                           enable_zerocopy_send_server=True)
1206            self.log_print("Target socket options:")
1207            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1208
1209        if self.enable_adq:
1210            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1211            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1212                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1213            rpc.nvmf.nvmf_set_config(self.client, acceptor_poll_rate=10000)
1214
1215        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1216
1217        rpc.framework_start_init(self.client)
1218
1219        if self.bpf_scripts:
1220            self.bpf_start()
1221
1222        self.spdk_tgt_configure()
1223
1224    def __del__(self):
1225        if self.bpf_proc:
1226            self.log_print("Stopping BPF Trace script")
1227            self.bpf_proc.terminate()
1228            self.bpf_proc.wait()
1229
1230        if hasattr(self, "nvmf_proc"):
1231            try:
1232                self.nvmf_proc.terminate()
1233                self.nvmf_proc.wait()
1234            except Exception as e:
1235                self.log_print(e)
1236                self.nvmf_proc.kill()
1237                self.nvmf_proc.communicate()
1238
1239
1240class KernelInitiator(Initiator):
1241    def __init__(self, name, general_config, initiator_config):
1242        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1243
1244        # Defaults
1245        self.extra_params = ""
1246        self.ioengine = "libaio"
1247
1248        if "extra_params" in initiator_config:
1249            self.extra_params = initiator_config["extra_params"]
1250
1251        if "kernel_engine" in initiator_config:
1252            self.ioengine = initiator_config["kernel_engine"]
1253            if "io_uring" in self.ioengine:
1254                self.extra_params = "--nr-poll-queues=8"
1255
1256    def __del__(self):
1257        self.ssh_connection.close()
1258
1259    def get_connected_nvme_list(self):
1260        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1261        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1262                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1263        return nvme_list
1264
1265    def kernel_init_connect(self):
1266        self.log_print("Below connection attempts may result in error messages, this is expected!")
1267        for subsystem in self.subsystem_info_list:
1268            self.log_print("Trying to connect %s %s %s" % subsystem)
1269            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1270                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1271            time.sleep(2)
1272
1273        if "io_uring" in self.ioengine:
1274            self.log_print("Setting block layer settings for io_uring.")
1275
1276            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1277            #       apparently it's not possible for connected subsystems.
1278            #       Results in "error: Invalid argument"
1279            block_sysfs_settings = {
1280                "iostats": "0",
1281                "rq_affinity": "0",
1282                "nomerges": "2"
1283            }
1284
1285            for disk in self.get_connected_nvme_list():
1286                sysfs = os.path.join("/sys/block", disk, "queue")
1287                for k, v in block_sysfs_settings.items():
1288                    sysfs_opt_path = os.path.join(sysfs, k)
1289                    try:
1290                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1291                    except subprocess.CalledProcessError as e:
1292                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1293                    finally:
1294                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1295                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1296
1297    def kernel_init_disconnect(self):
1298        for subsystem in self.subsystem_info_list:
1299            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1300            time.sleep(1)
1301
1302    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1303        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1304
1305        filename_section = ""
1306        nvme_per_split = int(len(nvme_list) / len(threads))
1307        remainder = len(nvme_list) % len(threads)
1308        iterator = iter(nvme_list)
1309        result = []
1310        for i in range(len(threads)):
1311            result.append([])
1312            for _ in range(nvme_per_split):
1313                result[i].append(next(iterator))
1314                if remainder:
1315                    result[i].append(next(iterator))
1316                    remainder -= 1
1317        for i, r in enumerate(result):
1318            header = "[filename%s]" % i
1319            disks = "\n".join(["filename=%s" % x for x in r])
1320            job_section_qd = round((io_depth * len(r)) / num_jobs)
1321            if job_section_qd == 0:
1322                job_section_qd = 1
1323            iodepth = "iodepth=%s" % job_section_qd
1324            filename_section = "\n".join([filename_section, header, disks, iodepth])
1325
1326        return filename_section
1327
1328
1329class SPDKInitiator(Initiator):
1330    def __init__(self, name, general_config, initiator_config):
1331        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1332
1333        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1334            self.install_spdk()
1335
1336        # Required fields
1337        self.num_cores = initiator_config["num_cores"]
1338
1339    def install_spdk(self):
1340        self.log_print("Using fio binary %s" % self.fio_bin)
1341        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1342        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1343        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1344        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1345        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1346
1347        self.log_print("SPDK built")
1348        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1349
1350    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1351        bdev_cfg_section = {
1352            "subsystems": [
1353                {
1354                    "subsystem": "bdev",
1355                    "config": []
1356                }
1357            ]
1358        }
1359
1360        for i, subsys in enumerate(remote_subsystem_list):
1361            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1362            nvme_ctrl = {
1363                "method": "bdev_nvme_attach_controller",
1364                "params": {
1365                    "name": "Nvme{}".format(i),
1366                    "trtype": self.transport,
1367                    "traddr": sub_addr,
1368                    "trsvcid": sub_port,
1369                    "subnqn": sub_nqn,
1370                    "adrfam": "IPv4"
1371                }
1372            }
1373
1374            if self.enable_adq:
1375                nvme_ctrl["params"].update({"priority": "1"})
1376
1377            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1378
1379        return json.dumps(bdev_cfg_section, indent=2)
1380
1381    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1382        filename_section = ""
1383        if len(threads) >= len(subsystems):
1384            threads = range(0, len(subsystems))
1385        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1386        nvme_per_split = int(len(subsystems) / len(threads))
1387        remainder = len(subsystems) % len(threads)
1388        iterator = iter(filenames)
1389        result = []
1390        for i in range(len(threads)):
1391            result.append([])
1392            for _ in range(nvme_per_split):
1393                result[i].append(next(iterator))
1394            if remainder:
1395                result[i].append(next(iterator))
1396                remainder -= 1
1397        for i, r in enumerate(result):
1398            header = "[filename%s]" % i
1399            disks = "\n".join(["filename=%s" % x for x in r])
1400            job_section_qd = round((io_depth * len(r)) / num_jobs)
1401            if job_section_qd == 0:
1402                job_section_qd = 1
1403            iodepth = "iodepth=%s" % job_section_qd
1404            filename_section = "\n".join([filename_section, header, disks, iodepth])
1405
1406        return filename_section
1407
1408
1409if __name__ == "__main__":
1410    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1411    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1412
1413    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1414    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1415                        help='Configuration file.')
1416    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1417                        help='Results directory.')
1418    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1419                        help='CSV results filename.')
1420
1421    args = parser.parse_args()
1422
1423    print("Using config file: %s" % args.config)
1424    with open(args.config, "r") as config:
1425        data = json.load(config)
1426
1427    initiators = []
1428    fio_cases = []
1429
1430    general_config = data["general"]
1431    target_config = data["target"]
1432    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1433
1434    for k, v in data.items():
1435        if "target" in k:
1436            v.update({"results_dir": args.results})
1437            if data[k]["mode"] == "spdk":
1438                target_obj = SPDKTarget(k, data["general"], v)
1439            elif data[k]["mode"] == "kernel":
1440                target_obj = KernelTarget(k, data["general"], v)
1441                pass
1442        elif "initiator" in k:
1443            if data[k]["mode"] == "spdk":
1444                init_obj = SPDKInitiator(k, data["general"], v)
1445            elif data[k]["mode"] == "kernel":
1446                init_obj = KernelInitiator(k, data["general"], v)
1447            initiators.append(init_obj)
1448        elif "fio" in k:
1449            fio_workloads = itertools.product(data[k]["bs"],
1450                                              data[k]["qd"],
1451                                              data[k]["rw"])
1452
1453            fio_run_time = data[k]["run_time"]
1454            fio_ramp_time = data[k]["ramp_time"]
1455            fio_rw_mix_read = data[k]["rwmixread"]
1456            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1457            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1458
1459            fio_rate_iops = 0
1460            if "rate_iops" in data[k]:
1461                fio_rate_iops = data[k]["rate_iops"]
1462        else:
1463            continue
1464
1465    try:
1466        os.mkdir(args.results)
1467    except FileExistsError:
1468        pass
1469
1470    target_obj.tgt_start()
1471
1472    for i in initiators:
1473        i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1474        if i.enable_adq:
1475            i.adq_configure_tc()
1476
1477    # Poor mans threading
1478    # Run FIO tests
1479    for block_size, io_depth, rw in fio_workloads:
1480        threads = []
1481        configs = []
1482        for i in initiators:
1483            if i.mode == "kernel":
1484                i.kernel_init_connect()
1485
1486            cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1487                                   fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1488            configs.append(cfg)
1489
1490        for i, cfg in zip(initiators, configs):
1491            t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1492            threads.append(t)
1493        if target_obj.enable_sar:
1494            sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1495            sar_file_name = ".".join([sar_file_name, "txt"])
1496            t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1497            threads.append(t)
1498
1499        if target_obj.enable_pcm:
1500            pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1501
1502            pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1503            pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1504            pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1505
1506            threads.append(pcm_cpu_t)
1507            threads.append(pcm_mem_t)
1508            threads.append(pcm_pow_t)
1509
1510        if target_obj.enable_bandwidth:
1511            bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1512            bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1513            t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1514            threads.append(t)
1515
1516        if target_obj.enable_dpdk_memory:
1517            t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1518            threads.append(t)
1519
1520        if target_obj.enable_adq:
1521            ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time))
1522            threads.append(ethtool_thread)
1523
1524        for t in threads:
1525            t.start()
1526        for t in threads:
1527            t.join()
1528
1529        for i in initiators:
1530            if i.mode == "kernel":
1531                i.kernel_init_disconnect()
1532            i.copy_result_files(args.results)
1533
1534    target_obj.restore_governor()
1535    target_obj.restore_tuned()
1536    target_obj.restore_services()
1537    target_obj.restore_sysctl()
1538    for i in initiators:
1539        i.restore_governor()
1540        i.restore_tuned()
1541        i.restore_services()
1542        i.restore_sysctl()
1543    target_obj.parse_results(args.results, args.csv_filename)
1544