xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 1a00f5c09488e7466a331b8c75cde4969740357f)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20
21import rpc
22import rpc.client
23from common import *
24
25
26class Server:
27    def __init__(self, name, general_config, server_config):
28        self.name = name
29        self.username = general_config["username"]
30        self.password = general_config["password"]
31        self.transport = general_config["transport"].lower()
32        self.nic_ips = server_config["nic_ips"]
33        self.mode = server_config["mode"]
34
35        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
36        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
37            self.irq_scripts_dir = server_config["irq_scripts_dir"]
38
39        self.local_nic_info = []
40        self._nics_json_obj = {}
41        self.svc_restore_dict = {}
42        self.sysctl_restore_dict = {}
43        self.tuned_restore_dict = {}
44        self.governor_restore = ""
45        self.tuned_profile = ""
46
47        self.enable_adq = False
48        self.adq_priority = None
49        if "adq_enable" in server_config and server_config["adq_enable"]:
50            self.enable_adq = server_config["adq_enable"]
51            self.adq_priority = 1
52
53        if "tuned_profile" in server_config:
54            self.tuned_profile = server_config["tuned_profile"]
55
56        if not re.match("^[A-Za-z0-9]*$", name):
57            self.log_print("Please use a name which contains only letters or numbers")
58            sys.exit(1)
59
60    def log_print(self, msg):
61        print("[%s] %s" % (self.name, msg), flush=True)
62
63    def get_uncommented_lines(self, lines):
64        return [line for line in lines if line and not line.startswith('#')]
65
66    def get_nic_name_by_ip(self, ip):
67        if not self._nics_json_obj:
68            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
69            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
70        for nic in self._nics_json_obj:
71            for addr in nic["addr_info"]:
72                if ip in addr["local"]:
73                    return nic["ifname"]
74
75    def set_local_nic_info_helper(self):
76        pass
77
78    def set_local_nic_info(self, pci_info):
79        def extract_network_elements(json_obj):
80            nic_list = []
81            if isinstance(json_obj, list):
82                for x in json_obj:
83                    nic_list.extend(extract_network_elements(x))
84            elif isinstance(json_obj, dict):
85                if "children" in json_obj:
86                    nic_list.extend(extract_network_elements(json_obj["children"]))
87                if "class" in json_obj.keys() and "network" in json_obj["class"]:
88                    nic_list.append(json_obj)
89            return nic_list
90
91        self.local_nic_info = extract_network_elements(pci_info)
92
93    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
94        return ""
95
96    def configure_system(self):
97        self.configure_services()
98        self.configure_sysctl()
99        self.configure_tuned()
100        self.configure_cpu_governor()
101        self.configure_irq_affinity()
102
103    def configure_adq(self):
104        if self.mode == "kernel":
105            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
106            return
107        self.adq_load_modules()
108        self.adq_configure_nic()
109
110    def adq_load_modules(self):
111        self.log_print("Modprobing ADQ-related Linux modules...")
112        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
113        for module in adq_module_deps:
114            try:
115                self.exec_cmd(["sudo", "modprobe", module])
116                self.log_print("%s loaded!" % module)
117            except CalledProcessError as e:
118                self.log_print("ERROR: failed to load module %s" % module)
119                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
120
121    def adq_configure_tc(self):
122        self.log_print("Configuring ADQ Traffic classes and filters...")
123
124        if self.mode == "kernel":
125            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
126            return
127
128        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
129        num_queues_tc1 = self.num_cores
130        port_param = "dst_port" if isinstance(self, Target) else "src_port"
131        port = "4420"
132        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
133
134        for nic_ip in self.nic_ips:
135            nic_name = self.get_nic_name_by_ip(nic_ip)
136            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
137                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
138                                "queues", "%s@0" % num_queues_tc0,
139                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
140                                "hw", "1", "mode", "channel"]
141            self.log_print(" ".join(tc_qdisc_map_cmd))
142            self.exec_cmd(tc_qdisc_map_cmd)
143
144            time.sleep(5)
145            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
146            self.log_print(" ".join(tc_qdisc_ingress_cmd))
147            self.exec_cmd(tc_qdisc_ingress_cmd)
148
149            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
150                             "protocol", "ip", "ingress", "prio", "1", "flower",
151                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
152                             "skip_sw", "hw_tc", "1"]
153            self.log_print(" ".join(tc_filter_cmd))
154            self.exec_cmd(tc_filter_cmd)
155
156            # show tc configuration
157            self.log_print("Show tc configuration for %s NIC..." % nic_name)
158            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
159            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
160            self.log_print("%s" % tc_disk_out)
161            self.log_print("%s" % tc_filter_out)
162
163            # Ethtool coalesce settings must be applied after configuring traffic classes
164            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
165            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
166
167            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
168            xps_cmd = ["sudo", xps_script_path, nic_name]
169            self.log_print(xps_cmd)
170            self.exec_cmd(xps_cmd)
171
172    def adq_configure_nic(self):
173        self.log_print("Configuring NIC port settings for ADQ testing...")
174
175        # Reload the driver first, to make sure any previous settings are re-set.
176        try:
177            self.exec_cmd(["sudo", "rmmod", "ice"])
178            self.exec_cmd(["sudo", "modprobe", "ice"])
179        except CalledProcessError as e:
180            self.log_print("ERROR: failed to reload ice module!")
181            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
182
183        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
184        for nic in nic_names:
185            self.log_print(nic)
186            try:
187                self.exec_cmd(["sudo", "ethtool", "-K", nic,
188                               "hw-tc-offload", "on"])  # Enable hardware TC offload
189                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
190                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
191                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
192                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
193                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
194                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
195                               "channel-pkt-inspect-optimize", "on"])
196            except CalledProcessError as e:
197                self.log_print("ERROR: failed to configure NIC port using ethtool!")
198                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
199                self.log_print("Please update your NIC driver and firmware versions and try again.")
200            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
201            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
202
203    def configure_services(self):
204        self.log_print("Configuring active services...")
205        svc_config = configparser.ConfigParser(strict=False)
206
207        # Below list is valid only for RHEL / Fedora systems and might not
208        # contain valid names for other distributions.
209        svc_target_state = {
210            "firewalld": "inactive",
211            "irqbalance": "inactive",
212            "lldpad.service": "inactive",
213            "lldpad.socket": "inactive"
214        }
215
216        for service in svc_target_state:
217            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
218            out = "\n".join(["[%s]" % service, out])
219            svc_config.read_string(out)
220
221            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
222                continue
223
224            service_state = svc_config[service]["ActiveState"]
225            self.log_print("Current state of %s service is %s" % (service, service_state))
226            self.svc_restore_dict.update({service: service_state})
227            if service_state != "inactive":
228                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
229                self.exec_cmd(["sudo", "systemctl", "stop", service])
230
231    def configure_sysctl(self):
232        self.log_print("Tuning sysctl settings...")
233
234        busy_read = 0
235        if self.enable_adq and self.mode == "spdk":
236            busy_read = 1
237
238        sysctl_opts = {
239            "net.core.busy_poll": 0,
240            "net.core.busy_read": busy_read,
241            "net.core.somaxconn": 4096,
242            "net.core.netdev_max_backlog": 8192,
243            "net.ipv4.tcp_max_syn_backlog": 16384,
244            "net.core.rmem_max": 268435456,
245            "net.core.wmem_max": 268435456,
246            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
247            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
248            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
249            "net.ipv4.route.flush": 1,
250            "vm.overcommit_memory": 1,
251        }
252
253        for opt, value in sysctl_opts.items():
254            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
255            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
256
257    def configure_tuned(self):
258        if not self.tuned_profile:
259            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
260            return
261
262        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
263        service = "tuned"
264        tuned_config = configparser.ConfigParser(strict=False)
265
266        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
267        out = "\n".join(["[%s]" % service, out])
268        tuned_config.read_string(out)
269        tuned_state = tuned_config[service]["ActiveState"]
270        self.svc_restore_dict.update({service: tuned_state})
271
272        if tuned_state != "inactive":
273            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
274            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
275
276            self.tuned_restore_dict = {
277                "profile": profile,
278                "mode": profile_mode
279            }
280
281        self.exec_cmd(["sudo", "systemctl", "start", service])
282        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
283        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
284
285    def configure_cpu_governor(self):
286        self.log_print("Setting CPU governor to performance...")
287
288        # This assumes that there is the same CPU scaling governor on each CPU
289        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
290        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
291
292    def configure_irq_affinity(self):
293        self.log_print("Setting NIC irq affinity for NICs...")
294
295        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
296        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
297        for nic in nic_names:
298            irq_cmd = ["sudo", irq_script_path, nic]
299            self.log_print(irq_cmd)
300            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
301
302    def restore_services(self):
303        self.log_print("Restoring services...")
304        for service, state in self.svc_restore_dict.items():
305            cmd = "stop" if state == "inactive" else "start"
306            self.exec_cmd(["sudo", "systemctl", cmd, service])
307
308    def restore_sysctl(self):
309        self.log_print("Restoring sysctl settings...")
310        for opt, value in self.sysctl_restore_dict.items():
311            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
312
313    def restore_tuned(self):
314        self.log_print("Restoring tuned-adm settings...")
315
316        if not self.tuned_restore_dict:
317            return
318
319        if self.tuned_restore_dict["mode"] == "auto":
320            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
321            self.log_print("Reverted tuned-adm to auto_profile.")
322        else:
323            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
324            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
325
326    def restore_governor(self):
327        self.log_print("Restoring CPU governor setting...")
328        if self.governor_restore:
329            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
330            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
331
332
333class Target(Server):
334    def __init__(self, name, general_config, target_config):
335        super(Target, self).__init__(name, general_config, target_config)
336
337        # Defaults
338        self.enable_sar = False
339        self.sar_delay = 0
340        self.sar_interval = 0
341        self.sar_count = 0
342        self.enable_pcm = False
343        self.pcm_dir = ""
344        self.pcm_delay = 0
345        self.pcm_interval = 0
346        self.pcm_count = 0
347        self.enable_bandwidth = 0
348        self.bandwidth_count = 0
349        self.enable_dpdk_memory = False
350        self.dpdk_wait_time = 0
351        self.enable_zcopy = False
352        self.scheduler_name = "static"
353        self.null_block = 0
354        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
355        self.subsystem_info_list = []
356
357        if "null_block_devices" in target_config:
358            self.null_block = target_config["null_block_devices"]
359        if "sar_settings" in target_config:
360            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
361        if "pcm_settings" in target_config:
362            self.enable_pcm = True
363            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
364        if "enable_bandwidth" in target_config:
365            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
366        if "enable_dpdk_memory" in target_config:
367            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
368        if "scheduler_settings" in target_config:
369            self.scheduler_name = target_config["scheduler_settings"]
370        if "zcopy_settings" in target_config:
371            self.enable_zcopy = target_config["zcopy_settings"]
372        if "results_dir" in target_config:
373            self.results_dir = target_config["results_dir"]
374
375        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
376        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
377        self.set_local_nic_info(self.set_local_nic_info_helper())
378
379        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
380            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
381
382        self.configure_system()
383        if self.enable_adq:
384            self.configure_adq()
385        self.sys_config()
386
387    def set_local_nic_info_helper(self):
388        return json.loads(self.exec_cmd(["lshw", "-json"]))
389
390    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
391        stderr_opt = None
392        if stderr_redirect:
393            stderr_opt = subprocess.STDOUT
394        if change_dir:
395            old_cwd = os.getcwd()
396            os.chdir(change_dir)
397            self.log_print("Changing directory to %s" % change_dir)
398
399        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
400
401        if change_dir:
402            os.chdir(old_cwd)
403            self.log_print("Changing directory to %s" % old_cwd)
404        return out
405
406    def zip_spdk_sources(self, spdk_dir, dest_file):
407        self.log_print("Zipping SPDK source directory")
408        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
409        for root, directories, files in os.walk(spdk_dir, followlinks=True):
410            for file in files:
411                fh.write(os.path.relpath(os.path.join(root, file)))
412        fh.close()
413        self.log_print("Done zipping")
414
415    def read_json_stats(self, file):
416        with open(file, "r") as json_data:
417            data = json.load(json_data)
418            job_pos = 0  # job_post = 0 because using aggregated results
419
420            # Check if latency is in nano or microseconds to choose correct dict key
421            def get_lat_unit(key_prefix, dict_section):
422                # key prefix - lat, clat or slat.
423                # dict section - portion of json containing latency bucket in question
424                # Return dict key to access the bucket and unit as string
425                for k, _ in dict_section.items():
426                    if k.startswith(key_prefix):
427                        return k, k.split("_")[1]
428
429            def get_clat_percentiles(clat_dict_leaf):
430                if "percentile" in clat_dict_leaf:
431                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
432                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
433                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
434                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
435
436                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
437                else:
438                    # Latest fio versions do not provide "percentile" results if no
439                    # measurements were done, so just return zeroes
440                    return [0, 0, 0, 0]
441
442            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
443            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
444            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
445            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
446            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
447            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
448            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
449            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
450                data["jobs"][job_pos]["read"][clat_key])
451
452            if "ns" in lat_unit:
453                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
454            if "ns" in clat_unit:
455                read_p99_lat = read_p99_lat / 1000
456                read_p99_9_lat = read_p99_9_lat / 1000
457                read_p99_99_lat = read_p99_99_lat / 1000
458                read_p99_999_lat = read_p99_999_lat / 1000
459
460            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
461            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
462            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
463            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
464            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
465            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
466            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
467            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
468                data["jobs"][job_pos]["write"][clat_key])
469
470            if "ns" in lat_unit:
471                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
472            if "ns" in clat_unit:
473                write_p99_lat = write_p99_lat / 1000
474                write_p99_9_lat = write_p99_9_lat / 1000
475                write_p99_99_lat = write_p99_99_lat / 1000
476                write_p99_999_lat = write_p99_999_lat / 1000
477
478        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
479                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
480                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
481                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
482
483    def parse_results(self, results_dir, csv_file):
484        files = os.listdir(results_dir)
485        fio_files = filter(lambda x: ".fio" in x, files)
486        json_files = [x for x in files if ".json" in x]
487
488        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
489                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
490                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
491                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
492
493        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
494                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
495
496        header_line = ",".join(["Name", *headers])
497        aggr_header_line = ",".join(["Name", *aggr_headers])
498
499        # Create empty results file
500        with open(os.path.join(results_dir, csv_file), "w") as fh:
501            fh.write(aggr_header_line + "\n")
502        rows = set()
503
504        for fio_config in fio_files:
505            self.log_print("Getting FIO stats for %s" % fio_config)
506            job_name, _ = os.path.splitext(fio_config)
507
508            # Look in the filename for rwmixread value. Function arguments do
509            # not have that information.
510            # TODO: Improve this function by directly using workload params instead
511            # of regexing through filenames.
512            if "read" in job_name:
513                rw_mixread = 1
514            elif "write" in job_name:
515                rw_mixread = 0
516            else:
517                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
518
519            # If "_CPU" exists in name - ignore it
520            # Initiators for the same job could have different num_cores parameter
521            job_name = re.sub(r"_\d+CPU", "", job_name)
522            job_result_files = [x for x in json_files if x.startswith(job_name)]
523            self.log_print("Matching result files for current fio config:")
524            for j in job_result_files:
525                self.log_print("\t %s" % j)
526
527            # There may have been more than 1 initiator used in test, need to check that
528            # Result files are created so that string after last "_" separator is server name
529            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
530            inits_avg_results = []
531            for i in inits_names:
532                self.log_print("\tGetting stats for initiator %s" % i)
533                # There may have been more than 1 test run for this job, calculate average results for initiator
534                i_results = [x for x in job_result_files if i in x]
535                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
536
537                separate_stats = []
538                for r in i_results:
539                    try:
540                        stats = self.read_json_stats(os.path.join(results_dir, r))
541                        separate_stats.append(stats)
542                        self.log_print(stats)
543                    except JSONDecodeError as e:
544                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!")
545
546                init_results = [sum(x) for x in zip(*separate_stats)]
547                init_results = [x / len(separate_stats) for x in init_results]
548                inits_avg_results.append(init_results)
549
550                self.log_print("\tAverage results for initiator %s" % i)
551                self.log_print(init_results)
552                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
553                    fh.write(header_line + "\n")
554                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
555
556            # Sum results of all initiators running this FIO job.
557            # Latency results are an average of latencies from accros all initiators.
558            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
559            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
560            for key in inits_avg_results:
561                if "lat" in key:
562                    inits_avg_results[key] /= len(inits_names)
563
564            # Aggregate separate read/write values into common labels
565            # Take rw_mixread into consideration for mixed read/write workloads.
566            aggregate_results = OrderedDict()
567            for h in aggr_headers:
568                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
569                if "lat" in h:
570                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
571                else:
572                    _ = read_stat + write_stat
573                aggregate_results[h] = "{0:.3f}".format(_)
574
575            rows.add(",".join([job_name, *aggregate_results.values()]))
576
577        # Save results to file
578        for row in rows:
579            with open(os.path.join(results_dir, csv_file), "a") as fh:
580                fh.write(row + "\n")
581        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
582
583    def measure_sar(self, results_dir, sar_file_name):
584        self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
585        cpu_number = os.cpu_count()
586        sar_idle_sum = 0
587        time.sleep(self.sar_delay)
588        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
589        with open(os.path.join(results_dir, sar_file_name), "w") as fh:
590            for line in out.split("\n"):
591                if "Average" in line:
592                    if "CPU" in line:
593                        self.log_print("Summary CPU utilization from SAR:")
594                        self.log_print(line)
595                    elif "all" in line:
596                        self.log_print(line)
597                    else:
598                        sar_idle_sum += float(line.split()[7])
599            fh.write(out)
600        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
601        with open(os.path.join(results_dir, sar_file_name), "a") as f:
602            f.write("Total CPU used: " + str(sar_cpu_usage))
603
604    def ethtool_after_fio_ramp(self, fio_ramp_time):
605        time.sleep(fio_ramp_time//2)
606        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
607        for nic in nic_names:
608            self.log_print(nic)
609            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
610                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
611
612    def measure_pcm_memory(self, results_dir, pcm_file_name):
613        time.sleep(self.pcm_delay)
614        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
615        pcm_memory = subprocess.Popen(cmd)
616        time.sleep(self.pcm_count)
617        pcm_memory.terminate()
618
619    def measure_pcm(self, results_dir, pcm_file_name):
620        time.sleep(self.pcm_delay)
621        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
622        subprocess.run(cmd)
623        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
624        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
625        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
626        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
627        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
628
629    def measure_pcm_power(self, results_dir, pcm_power_file_name):
630        time.sleep(self.pcm_delay)
631        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
632        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
633            fh.write(out)
634
635    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
636        self.log_print("INFO: starting network bandwidth measure")
637        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
638                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
639
640    def measure_dpdk_memory(self, results_dir):
641        self.log_print("INFO: waiting to generate DPDK memory usage")
642        time.sleep(self.dpdk_wait_time)
643        self.log_print("INFO: generating DPDK memory usage")
644        rpc.env.env_dpdk_get_mem_stats
645        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
646
647    def sys_config(self):
648        self.log_print("====Kernel release:====")
649        self.log_print(os.uname().release)
650        self.log_print("====Kernel command line:====")
651        with open('/proc/cmdline') as f:
652            cmdline = f.readlines()
653            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
654        self.log_print("====sysctl conf:====")
655        with open('/etc/sysctl.conf') as f:
656            sysctl = f.readlines()
657            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
658        self.log_print("====Cpu power info:====")
659        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
660        self.log_print("====zcopy settings:====")
661        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
662        self.log_print("====Scheduler settings:====")
663        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
664
665
666class Initiator(Server):
667    def __init__(self, name, general_config, initiator_config):
668        super(Initiator, self).__init__(name, general_config, initiator_config)
669
670        # Required fields
671        self.ip = initiator_config["ip"]
672        self.target_nic_ips = initiator_config["target_nic_ips"]
673
674        # Defaults
675        self.cpus_allowed = None
676        self.cpus_allowed_policy = "shared"
677        self.spdk_dir = "/tmp/spdk"
678        self.fio_bin = "/usr/src/fio/fio"
679        self.nvmecli_bin = "nvme"
680        self.cpu_frequency = None
681        self.subsystem_info_list = []
682
683        if "spdk_dir" in initiator_config:
684            self.spdk_dir = initiator_config["spdk_dir"]
685        if "fio_bin" in initiator_config:
686            self.fio_bin = initiator_config["fio_bin"]
687        if "nvmecli_bin" in initiator_config:
688            self.nvmecli_bin = initiator_config["nvmecli_bin"]
689        if "cpus_allowed" in initiator_config:
690            self.cpus_allowed = initiator_config["cpus_allowed"]
691        if "cpus_allowed_policy" in initiator_config:
692            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
693        if "cpu_frequency" in initiator_config:
694            self.cpu_frequency = initiator_config["cpu_frequency"]
695        if os.getenv('SPDK_WORKSPACE'):
696            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
697
698        self.ssh_connection = paramiko.SSHClient()
699        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
700        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
701        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
702        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
703        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
704
705        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
706            self.copy_spdk("/tmp/spdk.zip")
707        self.set_local_nic_info(self.set_local_nic_info_helper())
708        self.set_cpu_frequency()
709        self.configure_system()
710        if self.enable_adq:
711            self.configure_adq()
712        self.sys_config()
713
714    def set_local_nic_info_helper(self):
715        return json.loads(self.exec_cmd(["lshw", "-json"]))
716
717    def stop(self):
718        self.ssh_connection.close()
719
720    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
721        if change_dir:
722            cmd = ["cd", change_dir, ";", *cmd]
723
724        # In case one of the command elements contains whitespace and is not
725        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
726        # when sending to remote system.
727        for i, c in enumerate(cmd):
728            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
729                cmd[i] = '"%s"' % c
730        cmd = " ".join(cmd)
731
732        # Redirect stderr to stdout thanks using get_pty option if needed
733        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
734        out = stdout.read().decode(encoding="utf-8")
735
736        # Check the return code
737        rc = stdout.channel.recv_exit_status()
738        if rc:
739            raise CalledProcessError(int(rc), cmd, out)
740
741        return out
742
743    def put_file(self, local, remote_dest):
744        ftp = self.ssh_connection.open_sftp()
745        ftp.put(local, remote_dest)
746        ftp.close()
747
748    def get_file(self, remote, local_dest):
749        ftp = self.ssh_connection.open_sftp()
750        ftp.get(remote, local_dest)
751        ftp.close()
752
753    def copy_spdk(self, local_spdk_zip):
754        self.log_print("Copying SPDK sources to initiator %s" % self.name)
755        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
756        self.log_print("Copied sources zip from target")
757        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
758        self.log_print("Sources unpacked")
759
760    def copy_result_files(self, dest_dir):
761        self.log_print("Copying results")
762
763        if not os.path.exists(dest_dir):
764            os.mkdir(dest_dir)
765
766        # Get list of result files from initiator and copy them back to target
767        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
768
769        for file in file_list:
770            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
771                          os.path.join(dest_dir, file))
772        self.log_print("Done copying results")
773
774    def discover_subsystems(self, address_list, subsys_no):
775        num_nvmes = range(0, subsys_no)
776        nvme_discover_output = ""
777        for ip, subsys_no in itertools.product(address_list, num_nvmes):
778            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
779            nvme_discover_cmd = ["sudo",
780                                 "%s" % self.nvmecli_bin,
781                                 "discover", "-t", "%s" % self.transport,
782                                 "-s", "%s" % (4420 + subsys_no),
783                                 "-a", "%s" % ip]
784
785            try:
786                stdout = self.exec_cmd(nvme_discover_cmd)
787                if stdout:
788                    nvme_discover_output = nvme_discover_output + stdout
789            except CalledProcessError:
790                # Do nothing. In case of discovering remote subsystems of kernel target
791                # we expect "nvme discover" to fail a bunch of times because we basically
792                # scan ports.
793                pass
794
795        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
796                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
797                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
798                                nvme_discover_output)  # from nvme discovery output
799        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
800        subsystems = list(set(subsystems))
801        subsystems.sort(key=lambda x: x[1])
802        self.log_print("Found matching subsystems on target side:")
803        for s in subsystems:
804            self.log_print(s)
805        self.subsystem_info_list = subsystems
806
807    def gen_fio_filename_conf(self, *args, **kwargs):
808        # Logic implemented in SPDKInitiator and KernelInitiator classes
809        pass
810
811    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
812        fio_conf_template = """
813[global]
814ioengine={ioengine}
815{spdk_conf}
816thread=1
817group_reporting=1
818direct=1
819percentile_list=50:90:99:99.5:99.9:99.99:99.999
820
821norandommap=1
822rw={rw}
823rwmixread={rwmixread}
824bs={block_size}
825time_based=1
826ramp_time={ramp_time}
827runtime={run_time}
828rate_iops={rate_iops}
829"""
830        if "spdk" in self.mode:
831            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
832            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
833            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
834            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
835        else:
836            ioengine = self.ioengine
837            spdk_conf = ""
838            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
839                                 "|", "awk", "'{print $1}'"])
840            subsystems = [x for x in out.split("\n") if "nvme" in x]
841
842        if self.cpus_allowed is not None:
843            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
844            cpus_num = 0
845            cpus = self.cpus_allowed.split(",")
846            for cpu in cpus:
847                if "-" in cpu:
848                    a, b = cpu.split("-")
849                    a = int(a)
850                    b = int(b)
851                    cpus_num += len(range(a, b))
852                else:
853                    cpus_num += 1
854            self.num_cores = cpus_num
855            threads = range(0, self.num_cores)
856        elif hasattr(self, 'num_cores'):
857            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
858            threads = range(0, int(self.num_cores))
859        else:
860            self.num_cores = len(subsystems)
861            threads = range(0, len(subsystems))
862
863        if "spdk" in self.mode:
864            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
865        else:
866            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
867
868        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
869                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
870                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
871
872        # TODO: hipri disabled for now, as it causes fio errors:
873        # io_u error on file /dev/nvme2n1: Operation not supported
874        # See comment in KernelInitiator class, kernel_init_connect() function
875        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
876            fio_config = fio_config + """
877fixedbufs=1
878registerfiles=1
879#hipri=1
880"""
881        if num_jobs:
882            fio_config = fio_config + "numjobs=%s \n" % num_jobs
883        if self.cpus_allowed is not None:
884            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
885            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
886        fio_config = fio_config + filename_section
887
888        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
889        if hasattr(self, "num_cores"):
890            fio_config_filename += "_%sCPU" % self.num_cores
891        fio_config_filename += ".fio"
892
893        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
894        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
895        self.log_print("Created FIO Config:")
896        self.log_print(fio_config)
897
898        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
899
900    def set_cpu_frequency(self):
901        if self.cpu_frequency is not None:
902            try:
903                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
904                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
905                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
906            except Exception:
907                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
908                sys.exit()
909        else:
910            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
911
912    def run_fio(self, fio_config_file, run_num=None):
913        job_name, _ = os.path.splitext(fio_config_file)
914        self.log_print("Starting FIO run for job: %s" % job_name)
915        self.log_print("Using FIO: %s" % self.fio_bin)
916
917        if run_num:
918            for i in range(1, run_num + 1):
919                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
920                try:
921                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
922                                            "--output=%s" % output_filename, "--eta=never"], True)
923                    self.log_print(output)
924                except subprocess.CalledProcessError as e:
925                    self.log_print("ERROR: Fio process failed!")
926                    self.log_print(e.stdout)
927        else:
928            output_filename = job_name + "_" + self.name + ".json"
929            output = self.exec_cmd(["sudo", self.fio_bin,
930                                    fio_config_file, "--output-format=json",
931                                    "--output" % output_filename], True)
932            self.log_print(output)
933        self.log_print("FIO run finished. Results in: %s" % output_filename)
934
935    def sys_config(self):
936        self.log_print("====Kernel release:====")
937        self.log_print(self.exec_cmd(["uname", "-r"]))
938        self.log_print("====Kernel command line:====")
939        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
940        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
941        self.log_print("====sysctl conf:====")
942        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
943        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
944        self.log_print("====Cpu power info:====")
945        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
946
947
948class KernelTarget(Target):
949    def __init__(self, name, general_config, target_config):
950        super(KernelTarget, self).__init__(name, general_config, target_config)
951        # Defaults
952        self.nvmet_bin = "nvmetcli"
953
954        if "nvmet_bin" in target_config:
955            self.nvmet_bin = target_config["nvmet_bin"]
956
957    def stop(self):
958        nvmet_command(self.nvmet_bin, "clear")
959
960    def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
961
962        nvmet_cfg = {
963            "ports": [],
964            "hosts": [],
965            "subsystems": [],
966        }
967
968        # Split disks between NIC IP's
969        disks_per_ip = int(len(nvme_list) / len(address_list))
970        disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
971
972        # Add remaining drives
973        for i, disk in enumerate(nvme_list[disks_per_ip * len(address_list):]):
974            disks_chunks[i].append(disk)
975
976        subsys_no = 1
977        port_no = 0
978        for ip, chunk in zip(address_list, disk_chunks):
979            for disk in chunk:
980                nqn = "nqn.2018-09.io.spdk:cnode%s" % subsys_no
981                nvmet_cfg["subsystems"].append({
982                    "allowed_hosts": [],
983                    "attr": {
984                        "allow_any_host": "1",
985                        "serial": "SPDK00%s" % subsys_no,
986                        "version": "1.3"
987                    },
988                    "namespaces": [
989                        {
990                            "device": {
991                                "path": disk,
992                                "uuid": "%s" % uuid.uuid4()
993                            },
994                            "enable": 1,
995                            "nsid": subsys_no
996                        }
997                    ],
998                    "nqn": nqn
999                })
1000
1001                nvmet_cfg["ports"].append({
1002                    "addr": {
1003                        "adrfam": "ipv4",
1004                        "traddr": ip,
1005                        "trsvcid": "%s" % (4420 + port_no),
1006                        "trtype": "%s" % self.transport
1007                    },
1008                    "portid": subsys_no,
1009                    "referrals": [],
1010                    "subsystems": [nqn]
1011                })
1012                subsys_no += 1
1013                port_no += 1
1014                self.subsystem_info_list.append([port_no, nqn, ip])
1015
1016        with open("kernel.conf", "w") as fh:
1017            fh.write(json.dumps(nvmet_cfg, indent=2))
1018        pass
1019
1020    def tgt_start(self):
1021        self.log_print("Configuring kernel NVMeOF Target")
1022
1023        if self.null_block:
1024            print("Configuring with null block device.")
1025            null_blk_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1026            self.kernel_tgt_gen_subsystem_conf(null_blk_list, self.nic_ips)
1027            self.subsys_no = len(null_blk_list)
1028        else:
1029            print("Configuring with NVMe drives.")
1030            nvme_list = get_nvme_devices()
1031            self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
1032            self.subsys_no = len(nvme_list)
1033
1034        nvmet_command(self.nvmet_bin, "clear")
1035        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1036
1037        if self.enable_adq:
1038            self.adq_configure_tc()
1039
1040        self.log_print("Done configuring kernel NVMeOF Target")
1041
1042
1043class SPDKTarget(Target):
1044    def __init__(self, name, general_config, target_config):
1045        super(SPDKTarget, self).__init__(name, general_config, target_config)
1046
1047        # Required fields
1048        self.core_mask = target_config["core_mask"]
1049        self.num_cores = self.get_num_cores(self.core_mask)
1050
1051        # Defaults
1052        self.dif_insert_strip = False
1053        self.null_block_dif_type = 0
1054        self.num_shared_buffers = 4096
1055        self.max_queue_depth = 128
1056        self.bpf_proc = None
1057        self.bpf_scripts = []
1058        self.enable_idxd = False
1059
1060        if "num_shared_buffers" in target_config:
1061            self.num_shared_buffers = target_config["num_shared_buffers"]
1062        if "max_queue_depth" in target_config:
1063            self.max_queue_depth = target_config["max_queue_depth"]
1064        if "null_block_dif_type" in target_config:
1065            self.null_block_dif_type = target_config["null_block_dif_type"]
1066        if "dif_insert_strip" in target_config:
1067            self.dif_insert_strip = target_config["dif_insert_strip"]
1068        if "bpf_scripts" in target_config:
1069            self.bpf_scripts = target_config["bpf_scripts"]
1070        if "idxd_settings" in target_config:
1071            self.enable_idxd = target_config["idxd_settings"]
1072
1073        self.log_print("====IDXD settings:====")
1074        self.log_print("IDXD enabled: %s" % (self.enable_idxd))
1075
1076    def get_num_cores(self, core_mask):
1077        if "0x" in core_mask:
1078            return bin(int(core_mask, 16)).count("1")
1079        else:
1080            num_cores = 0
1081            core_mask = core_mask.replace("[", "")
1082            core_mask = core_mask.replace("]", "")
1083            for i in core_mask.split(","):
1084                if "-" in i:
1085                    x, y = i.split("-")
1086                    num_cores += len(range(int(x), int(y))) + 1
1087                else:
1088                    num_cores += 1
1089            return num_cores
1090
1091    def spdk_tgt_configure(self):
1092        self.log_print("Configuring SPDK NVMeOF target via RPC")
1093
1094        if self.enable_adq:
1095            self.adq_configure_tc()
1096
1097        # Create transport layer
1098        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1099                                       num_shared_buffers=self.num_shared_buffers,
1100                                       max_queue_depth=self.max_queue_depth,
1101                                       dif_insert_or_strip=self.dif_insert_strip,
1102                                       sock_priority=self.adq_priority)
1103        self.log_print("SPDK NVMeOF transport layer:")
1104        rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1105
1106        if self.null_block:
1107            self.spdk_tgt_add_nullblock(self.null_block)
1108            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1109        else:
1110            self.spdk_tgt_add_nvme_conf()
1111            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1112
1113        self.log_print("Done configuring SPDK NVMeOF Target")
1114
1115    def spdk_tgt_add_nullblock(self, null_block_count):
1116        md_size = 0
1117        block_size = 4096
1118        if self.null_block_dif_type != 0:
1119            md_size = 128
1120
1121        self.log_print("Adding null block bdevices to config via RPC")
1122        for i in range(null_block_count):
1123            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1124            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1125                                      dif_type=self.null_block_dif_type, md_size=md_size)
1126        self.log_print("SPDK Bdevs configuration:")
1127        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1128
1129    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1130        self.log_print("Adding NVMe bdevs to config via RPC")
1131
1132        bdfs = get_nvme_devices_bdf()
1133        bdfs = [b.replace(":", ".") for b in bdfs]
1134
1135        if req_num_disks:
1136            if req_num_disks > len(bdfs):
1137                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1138                sys.exit(1)
1139            else:
1140                bdfs = bdfs[0:req_num_disks]
1141
1142        for i, bdf in enumerate(bdfs):
1143            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1144
1145        self.log_print("SPDK Bdevs configuration:")
1146        rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1147
1148    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1149        self.log_print("Adding subsystems to config")
1150        port = "4420"
1151        if not req_num_disks:
1152            req_num_disks = get_nvme_devices_count()
1153
1154        # Distribute bdevs between provided NICs
1155        num_disks = range(0, req_num_disks)
1156        if len(num_disks) == 1:
1157            disks_per_ip = 1
1158        else:
1159            disks_per_ip = int(len(num_disks) / len(ips))
1160        disk_chunks = [[*num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i]] for i in range(0, len(ips))]
1161
1162        # Add remaining drives
1163        for i, disk in enumerate(num_disks[disks_per_ip * len(ips):]):
1164            disk_chunks[i].append(disk)
1165
1166        # Create subsystems, add bdevs to namespaces, add listeners
1167        for ip, chunk in zip(ips, disk_chunks):
1168            for c in chunk:
1169                nqn = "nqn.2018-09.io.spdk:cnode%s" % c
1170                serial = "SPDK00%s" % c
1171                bdev_name = "Nvme%sn1" % c
1172                rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1173                                               allow_any_host=True, max_namespaces=8)
1174                rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1175
1176                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1177                                                     nqn=nqn,
1178                                                     trtype=self.transport,
1179                                                     traddr=ip,
1180                                                     trsvcid=port,
1181                                                     adrfam="ipv4")
1182
1183                self.subsystem_info_list.append([port, nqn, ip])
1184        self.log_print("SPDK NVMeOF subsystem configuration:")
1185        rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1186
1187    def bpf_start(self):
1188        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1189        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1190        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1191        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1192
1193        with open(self.pid, "r") as fh:
1194            nvmf_pid = str(fh.readline())
1195
1196        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1197        self.log_print(cmd)
1198        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1199
1200    def tgt_start(self):
1201        if self.null_block:
1202            self.subsys_no = 1
1203        else:
1204            self.subsys_no = get_nvme_devices_count()
1205        self.log_print("Starting SPDK NVMeOF Target process")
1206        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1207        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1208        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1209
1210        with open(self.pid, "w") as fh:
1211            fh.write(str(proc.pid))
1212        self.nvmf_proc = proc
1213        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1214        self.log_print("Waiting for spdk to initialize...")
1215        while True:
1216            if os.path.exists("/var/tmp/spdk.sock"):
1217                break
1218            time.sleep(1)
1219        self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
1220
1221        if self.enable_zcopy:
1222            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1223                                           enable_zerocopy_send_server=True)
1224            self.log_print("Target socket options:")
1225            rpc.client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1226
1227        if self.enable_adq:
1228            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1229            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1230                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1231
1232        if self.enable_idxd:
1233            rpc.idxd.idxd_scan_accel_engine(self.client, config_number=0, config_kernel_mode=None)
1234            self.log_print("Target IDXD accel engine enabled")
1235
1236        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1237        rpc.framework_start_init(self.client)
1238
1239        if self.bpf_scripts:
1240            self.bpf_start()
1241
1242        self.spdk_tgt_configure()
1243
1244    def stop(self):
1245        if self.bpf_proc:
1246            self.log_print("Stopping BPF Trace script")
1247            self.bpf_proc.terminate()
1248            self.bpf_proc.wait()
1249
1250        if hasattr(self, "nvmf_proc"):
1251            try:
1252                self.nvmf_proc.terminate()
1253                self.nvmf_proc.wait()
1254            except Exception as e:
1255                self.log_print(e)
1256                self.nvmf_proc.kill()
1257                self.nvmf_proc.communicate()
1258
1259
1260class KernelInitiator(Initiator):
1261    def __init__(self, name, general_config, initiator_config):
1262        super(KernelInitiator, self).__init__(name, general_config, initiator_config)
1263
1264        # Defaults
1265        self.extra_params = ""
1266        self.ioengine = "libaio"
1267
1268        if "extra_params" in initiator_config:
1269            self.extra_params = initiator_config["extra_params"]
1270
1271        if "kernel_engine" in initiator_config:
1272            self.ioengine = initiator_config["kernel_engine"]
1273            if "io_uring" in self.ioengine:
1274                self.extra_params = "--nr-poll-queues=8"
1275
1276    def get_connected_nvme_list(self):
1277        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1278        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1279                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1280        return nvme_list
1281
1282    def kernel_init_connect(self):
1283        self.log_print("Below connection attempts may result in error messages, this is expected!")
1284        for subsystem in self.subsystem_info_list:
1285            self.log_print("Trying to connect %s %s %s" % subsystem)
1286            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1287                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1288            time.sleep(2)
1289
1290        if "io_uring" in self.ioengine:
1291            self.log_print("Setting block layer settings for io_uring.")
1292
1293            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1294            #       apparently it's not possible for connected subsystems.
1295            #       Results in "error: Invalid argument"
1296            block_sysfs_settings = {
1297                "iostats": "0",
1298                "rq_affinity": "0",
1299                "nomerges": "2"
1300            }
1301
1302            for disk in self.get_connected_nvme_list():
1303                sysfs = os.path.join("/sys/block", disk, "queue")
1304                for k, v in block_sysfs_settings.items():
1305                    sysfs_opt_path = os.path.join(sysfs, k)
1306                    try:
1307                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1308                    except subprocess.CalledProcessError as e:
1309                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1310                    finally:
1311                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1312                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1313
1314    def kernel_init_disconnect(self):
1315        for subsystem in self.subsystem_info_list:
1316            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1317            time.sleep(1)
1318
1319    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1320        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1321
1322        filename_section = ""
1323        nvme_per_split = int(len(nvme_list) / len(threads))
1324        remainder = len(nvme_list) % len(threads)
1325        iterator = iter(nvme_list)
1326        result = []
1327        for i in range(len(threads)):
1328            result.append([])
1329            for _ in range(nvme_per_split):
1330                result[i].append(next(iterator))
1331                if remainder:
1332                    result[i].append(next(iterator))
1333                    remainder -= 1
1334        for i, r in enumerate(result):
1335            header = "[filename%s]" % i
1336            disks = "\n".join(["filename=%s" % x for x in r])
1337            job_section_qd = round((io_depth * len(r)) / num_jobs)
1338            if job_section_qd == 0:
1339                job_section_qd = 1
1340            iodepth = "iodepth=%s" % job_section_qd
1341            filename_section = "\n".join([filename_section, header, disks, iodepth])
1342
1343        return filename_section
1344
1345
1346class SPDKInitiator(Initiator):
1347    def __init__(self, name, general_config, initiator_config):
1348        super(SPDKInitiator, self).__init__(name, general_config, initiator_config)
1349
1350        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1351            self.install_spdk()
1352
1353        # Required fields
1354        self.num_cores = initiator_config["num_cores"]
1355
1356        # Optional fields
1357        self.enable_data_digest = False
1358        if "enable_data_digest" in initiator_config:
1359            self.enable_data_digest = initiator_config["enable_data_digest"]
1360
1361    def install_spdk(self):
1362        self.log_print("Using fio binary %s" % self.fio_bin)
1363        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1364        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1365        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1366        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1367        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1368
1369        self.log_print("SPDK built")
1370        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1371
1372    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1373        bdev_cfg_section = {
1374            "subsystems": [
1375                {
1376                    "subsystem": "bdev",
1377                    "config": []
1378                }
1379            ]
1380        }
1381
1382        for i, subsys in enumerate(remote_subsystem_list):
1383            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1384            nvme_ctrl = {
1385                "method": "bdev_nvme_attach_controller",
1386                "params": {
1387                    "name": "Nvme{}".format(i),
1388                    "trtype": self.transport,
1389                    "traddr": sub_addr,
1390                    "trsvcid": sub_port,
1391                    "subnqn": sub_nqn,
1392                    "adrfam": "IPv4"
1393                }
1394            }
1395
1396            if self.enable_adq:
1397                nvme_ctrl["params"].update({"priority": "1"})
1398
1399            if self.enable_data_digest:
1400                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1401
1402            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1403
1404        return json.dumps(bdev_cfg_section, indent=2)
1405
1406    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1407        filename_section = ""
1408        if len(threads) >= len(subsystems):
1409            threads = range(0, len(subsystems))
1410        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1411        nvme_per_split = int(len(subsystems) / len(threads))
1412        remainder = len(subsystems) % len(threads)
1413        iterator = iter(filenames)
1414        result = []
1415        for i in range(len(threads)):
1416            result.append([])
1417            for _ in range(nvme_per_split):
1418                result[i].append(next(iterator))
1419            if remainder:
1420                result[i].append(next(iterator))
1421                remainder -= 1
1422        for i, r in enumerate(result):
1423            header = "[filename%s]" % i
1424            disks = "\n".join(["filename=%s" % x for x in r])
1425            job_section_qd = round((io_depth * len(r)) / num_jobs)
1426            if job_section_qd == 0:
1427                job_section_qd = 1
1428            iodepth = "iodepth=%s" % job_section_qd
1429            filename_section = "\n".join([filename_section, header, disks, iodepth])
1430
1431        return filename_section
1432
1433
1434if __name__ == "__main__":
1435    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1436    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1437
1438    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1439    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1440                        help='Configuration file.')
1441    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1442                        help='Results directory.')
1443    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1444                        help='CSV results filename.')
1445
1446    args = parser.parse_args()
1447
1448    print("Using config file: %s" % args.config)
1449    with open(args.config, "r") as config:
1450        data = json.load(config)
1451
1452    initiators = []
1453    fio_cases = []
1454
1455    general_config = data["general"]
1456    target_config = data["target"]
1457    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1458
1459    for k, v in data.items():
1460        if "target" in k:
1461            v.update({"results_dir": args.results})
1462            if data[k]["mode"] == "spdk":
1463                target_obj = SPDKTarget(k, data["general"], v)
1464            elif data[k]["mode"] == "kernel":
1465                target_obj = KernelTarget(k, data["general"], v)
1466                pass
1467        elif "initiator" in k:
1468            if data[k]["mode"] == "spdk":
1469                init_obj = SPDKInitiator(k, data["general"], v)
1470            elif data[k]["mode"] == "kernel":
1471                init_obj = KernelInitiator(k, data["general"], v)
1472            initiators.append(init_obj)
1473        elif "fio" in k:
1474            fio_workloads = itertools.product(data[k]["bs"],
1475                                              data[k]["qd"],
1476                                              data[k]["rw"])
1477
1478            fio_run_time = data[k]["run_time"]
1479            fio_ramp_time = data[k]["ramp_time"]
1480            fio_rw_mix_read = data[k]["rwmixread"]
1481            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1482            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1483
1484            fio_rate_iops = 0
1485            if "rate_iops" in data[k]:
1486                fio_rate_iops = data[k]["rate_iops"]
1487        else:
1488            continue
1489
1490    try:
1491        os.mkdir(args.results)
1492    except FileExistsError:
1493        pass
1494
1495    # TODO: This try block is definietly too large. Need to break this up into separate
1496    # logical blocks to reduce size.
1497    try:
1498        target_obj.tgt_start()
1499
1500        for i in initiators:
1501            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1502            if i.enable_adq:
1503                i.adq_configure_tc()
1504
1505        # Poor mans threading
1506        # Run FIO tests
1507        for block_size, io_depth, rw in fio_workloads:
1508            threads = []
1509            configs = []
1510            for i in initiators:
1511                if i.mode == "kernel":
1512                    i.kernel_init_connect()
1513
1514                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1515                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1516                configs.append(cfg)
1517
1518            for i, cfg in zip(initiators, configs):
1519                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1520                threads.append(t)
1521            if target_obj.enable_sar:
1522                sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
1523                sar_file_name = ".".join([sar_file_name, "txt"])
1524                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_name))
1525                threads.append(t)
1526
1527            if target_obj.enable_pcm:
1528                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1529
1530                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1531                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1532                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1533
1534                threads.append(pcm_cpu_t)
1535                threads.append(pcm_mem_t)
1536                threads.append(pcm_pow_t)
1537
1538            if target_obj.enable_bandwidth:
1539                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1540                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1541                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1542                threads.append(t)
1543
1544            if target_obj.enable_dpdk_memory:
1545                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1546                threads.append(t)
1547
1548            if target_obj.enable_adq:
1549                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1550                threads.append(ethtool_thread)
1551
1552            for t in threads:
1553                t.start()
1554            for t in threads:
1555                t.join()
1556
1557            for i in initiators:
1558                if i.mode == "kernel":
1559                    i.kernel_init_disconnect()
1560                i.copy_result_files(args.results)
1561
1562        target_obj.restore_governor()
1563        target_obj.restore_tuned()
1564        target_obj.restore_services()
1565        target_obj.restore_sysctl()
1566        for i in initiators:
1567            i.restore_governor()
1568            i.restore_tuned()
1569            i.restore_services()
1570            i.restore_sysctl()
1571        target_obj.parse_results(args.results, args.csv_filename)
1572    finally:
1573        for i in initiators:
1574            try:
1575                i.stop()
1576            except Exception as err:
1577                pass
1578        target_obj.stop()
1579