xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision ea8f5b27612fa03698a9ce3ad4bd37765d9cdfa5)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    # pylint: disable=R0201
97    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
98        return ""
99
100    def configure_system(self):
101        self.configure_services()
102        self.configure_sysctl()
103        self.configure_tuned()
104        self.configure_cpu_governor()
105        self.configure_irq_affinity()
106
107    def configure_adq(self):
108        if self.mode == "kernel":
109            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
110            return
111        self.adq_load_modules()
112        self.adq_configure_nic()
113
114    def adq_load_modules(self):
115        self.log_print("Modprobing ADQ-related Linux modules...")
116        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
117        for module in adq_module_deps:
118            try:
119                self.exec_cmd(["sudo", "modprobe", module])
120                self.log_print("%s loaded!" % module)
121            except CalledProcessError as e:
122                self.log_print("ERROR: failed to load module %s" % module)
123                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
124
125    def adq_configure_tc(self):
126        self.log_print("Configuring ADQ Traffic classes and filters...")
127
128        if self.mode == "kernel":
129            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
130            return
131
132        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
133        num_queues_tc1 = self.num_cores
134        port_param = "dst_port" if isinstance(self, Target) else "src_port"
135        port = "4420"
136        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
137
138        for nic_ip in self.nic_ips:
139            nic_name = self.get_nic_name_by_ip(nic_ip)
140            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
141                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
142                                "queues", "%s@0" % num_queues_tc0,
143                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
144                                "hw", "1", "mode", "channel"]
145            self.log_print(" ".join(tc_qdisc_map_cmd))
146            self.exec_cmd(tc_qdisc_map_cmd)
147
148            time.sleep(5)
149            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
150            self.log_print(" ".join(tc_qdisc_ingress_cmd))
151            self.exec_cmd(tc_qdisc_ingress_cmd)
152
153            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
154                             "protocol", "ip", "ingress", "prio", "1", "flower",
155                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
156                             "skip_sw", "hw_tc", "1"]
157            self.log_print(" ".join(tc_filter_cmd))
158            self.exec_cmd(tc_filter_cmd)
159
160            # show tc configuration
161            self.log_print("Show tc configuration for %s NIC..." % nic_name)
162            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
163            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
164            self.log_print("%s" % tc_disk_out)
165            self.log_print("%s" % tc_filter_out)
166
167            # Ethtool coalesce settings must be applied after configuring traffic classes
168            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
169            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
170
171            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
172            xps_cmd = ["sudo", xps_script_path, nic_name]
173            self.log_print(xps_cmd)
174            self.exec_cmd(xps_cmd)
175
176    def reload_driver(self, driver):
177        try:
178            self.exec_cmd(["sudo", "rmmod", driver])
179            self.exec_cmd(["sudo", "modprobe", driver])
180        except CalledProcessError as e:
181            self.log_print("ERROR: failed to reload %s module!" % driver)
182            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
183
184    def adq_configure_nic(self):
185        self.log_print("Configuring NIC port settings for ADQ testing...")
186
187        # Reload the driver first, to make sure any previous settings are re-set.
188        self.reload_driver("ice")
189
190        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
191        for nic in nic_names:
192            self.log_print(nic)
193            try:
194                self.exec_cmd(["sudo", "ethtool", "-K", nic,
195                               "hw-tc-offload", "on"])  # Enable hardware TC offload
196                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
197                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
198                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
199                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
200                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
201                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
202                               "channel-pkt-inspect-optimize", "on"])
203            except CalledProcessError as e:
204                self.log_print("ERROR: failed to configure NIC port using ethtool!")
205                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
206                self.log_print("Please update your NIC driver and firmware versions and try again.")
207            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
208            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
209
210    def configure_services(self):
211        self.log_print("Configuring active services...")
212        svc_config = configparser.ConfigParser(strict=False)
213
214        # Below list is valid only for RHEL / Fedora systems and might not
215        # contain valid names for other distributions.
216        svc_target_state = {
217            "firewalld": "inactive",
218            "irqbalance": "inactive",
219            "lldpad.service": "inactive",
220            "lldpad.socket": "inactive"
221        }
222
223        for service in svc_target_state:
224            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
225            out = "\n".join(["[%s]" % service, out])
226            svc_config.read_string(out)
227
228            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
229                continue
230
231            service_state = svc_config[service]["ActiveState"]
232            self.log_print("Current state of %s service is %s" % (service, service_state))
233            self.svc_restore_dict.update({service: service_state})
234            if service_state != "inactive":
235                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
236                self.exec_cmd(["sudo", "systemctl", "stop", service])
237
238    def configure_sysctl(self):
239        self.log_print("Tuning sysctl settings...")
240
241        busy_read = 0
242        if self.enable_adq and self.mode == "spdk":
243            busy_read = 1
244
245        sysctl_opts = {
246            "net.core.busy_poll": 0,
247            "net.core.busy_read": busy_read,
248            "net.core.somaxconn": 4096,
249            "net.core.netdev_max_backlog": 8192,
250            "net.ipv4.tcp_max_syn_backlog": 16384,
251            "net.core.rmem_max": 268435456,
252            "net.core.wmem_max": 268435456,
253            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
254            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
255            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
256            "net.ipv4.route.flush": 1,
257            "vm.overcommit_memory": 1,
258        }
259
260        for opt, value in sysctl_opts.items():
261            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
262            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
263
264    def configure_tuned(self):
265        if not self.tuned_profile:
266            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
267            return
268
269        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
270        service = "tuned"
271        tuned_config = configparser.ConfigParser(strict=False)
272
273        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
274        out = "\n".join(["[%s]" % service, out])
275        tuned_config.read_string(out)
276        tuned_state = tuned_config[service]["ActiveState"]
277        self.svc_restore_dict.update({service: tuned_state})
278
279        if tuned_state != "inactive":
280            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
281            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
282
283            self.tuned_restore_dict = {
284                "profile": profile,
285                "mode": profile_mode
286            }
287
288        self.exec_cmd(["sudo", "systemctl", "start", service])
289        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
290        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
291
292    def configure_cpu_governor(self):
293        self.log_print("Setting CPU governor to performance...")
294
295        # This assumes that there is the same CPU scaling governor on each CPU
296        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
297        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
298
299    def configure_irq_affinity(self):
300        self.log_print("Setting NIC irq affinity for NICs...")
301
302        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
303        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
304        for nic in nic_names:
305            irq_cmd = ["sudo", irq_script_path, nic]
306            self.log_print(irq_cmd)
307            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
308
309    def restore_services(self):
310        self.log_print("Restoring services...")
311        for service, state in self.svc_restore_dict.items():
312            cmd = "stop" if state == "inactive" else "start"
313            self.exec_cmd(["sudo", "systemctl", cmd, service])
314
315    def restore_sysctl(self):
316        self.log_print("Restoring sysctl settings...")
317        for opt, value in self.sysctl_restore_dict.items():
318            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
319
320    def restore_tuned(self):
321        self.log_print("Restoring tuned-adm settings...")
322
323        if not self.tuned_restore_dict:
324            return
325
326        if self.tuned_restore_dict["mode"] == "auto":
327            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
328            self.log_print("Reverted tuned-adm to auto_profile.")
329        else:
330            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
331            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
332
333    def restore_governor(self):
334        self.log_print("Restoring CPU governor setting...")
335        if self.governor_restore:
336            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
337            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
338
339
340class Target(Server):
341    def __init__(self, name, general_config, target_config):
342        super().__init__(name, general_config, target_config)
343
344        # Defaults
345        self.enable_sar = False
346        self.sar_delay = 0
347        self.sar_interval = 0
348        self.sar_count = 0
349        self.enable_pcm = False
350        self.pcm_dir = ""
351        self.pcm_delay = 0
352        self.pcm_interval = 0
353        self.pcm_count = 0
354        self.enable_bandwidth = 0
355        self.bandwidth_count = 0
356        self.enable_dpdk_memory = False
357        self.dpdk_wait_time = 0
358        self.enable_zcopy = False
359        self.scheduler_name = "static"
360        self.null_block = 0
361        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
362        self.subsystem_info_list = []
363        self.initiator_info = []
364
365        if "null_block_devices" in target_config:
366            self.null_block = target_config["null_block_devices"]
367        if "sar_settings" in target_config:
368            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
369        if "pcm_settings" in target_config:
370            self.enable_pcm = True
371            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
372        if "enable_bandwidth" in target_config:
373            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
374        if "enable_dpdk_memory" in target_config:
375            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
376        if "scheduler_settings" in target_config:
377            self.scheduler_name = target_config["scheduler_settings"]
378        if "zcopy_settings" in target_config:
379            self.enable_zcopy = target_config["zcopy_settings"]
380        if "results_dir" in target_config:
381            self.results_dir = target_config["results_dir"]
382
383        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
384        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
385        self.set_local_nic_info(self.set_local_nic_info_helper())
386
387        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
388            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
389
390        self.configure_system()
391        if self.enable_adq:
392            self.configure_adq()
393        self.sys_config()
394
395    def set_local_nic_info_helper(self):
396        return json.loads(self.exec_cmd(["lshw", "-json"]))
397
398    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
399        stderr_opt = None
400        if stderr_redirect:
401            stderr_opt = subprocess.STDOUT
402        if change_dir:
403            old_cwd = os.getcwd()
404            os.chdir(change_dir)
405            self.log_print("Changing directory to %s" % change_dir)
406
407        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
408
409        if change_dir:
410            os.chdir(old_cwd)
411            self.log_print("Changing directory to %s" % old_cwd)
412        return out
413
414    def zip_spdk_sources(self, spdk_dir, dest_file):
415        self.log_print("Zipping SPDK source directory")
416        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
417        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
418            for file in files:
419                fh.write(os.path.relpath(os.path.join(root, file)))
420        fh.close()
421        self.log_print("Done zipping")
422
423    @staticmethod
424    def _chunks(input_list, chunks_no):
425        div, rem = divmod(len(input_list), chunks_no)
426        for i in range(chunks_no):
427            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
428            yield input_list[si:si + (div + 1 if i < rem else div)]
429
430    def spread_bdevs(self, req_disks):
431        # Spread available block devices indexes:
432        # - evenly across available initiator systems
433        # - evenly across available NIC interfaces for
434        #   each initiator
435        # Not NUMA aware.
436        ip_bdev_map = []
437        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
438
439        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
440            self.initiator_info[i]["bdev_range"] = init_chunk
441            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
442            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
443                for c in nic_chunk:
444                    ip_bdev_map.append((ip, c))
445        return ip_bdev_map
446
447    @staticmethod
448    def read_json_stats(file):
449        with open(file, "r") as json_data:
450            data = json.load(json_data)
451            job_pos = 0  # job_post = 0 because using aggregated results
452
453            # Check if latency is in nano or microseconds to choose correct dict key
454            def get_lat_unit(key_prefix, dict_section):
455                # key prefix - lat, clat or slat.
456                # dict section - portion of json containing latency bucket in question
457                # Return dict key to access the bucket and unit as string
458                for k, _ in dict_section.items():
459                    if k.startswith(key_prefix):
460                        return k, k.split("_")[1]
461
462            def get_clat_percentiles(clat_dict_leaf):
463                if "percentile" in clat_dict_leaf:
464                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
465                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
466                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
467                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
468
469                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
470                else:
471                    # Latest fio versions do not provide "percentile" results if no
472                    # measurements were done, so just return zeroes
473                    return [0, 0, 0, 0]
474
475            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
476            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
477            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
478            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
479            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
480            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
481            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
482            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
483                data["jobs"][job_pos]["read"][clat_key])
484
485            if "ns" in lat_unit:
486                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
487            if "ns" in clat_unit:
488                read_p99_lat = read_p99_lat / 1000
489                read_p99_9_lat = read_p99_9_lat / 1000
490                read_p99_99_lat = read_p99_99_lat / 1000
491                read_p99_999_lat = read_p99_999_lat / 1000
492
493            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
494            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
495            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
496            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
497            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
498            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
499            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
500            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
501                data["jobs"][job_pos]["write"][clat_key])
502
503            if "ns" in lat_unit:
504                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
505            if "ns" in clat_unit:
506                write_p99_lat = write_p99_lat / 1000
507                write_p99_9_lat = write_p99_9_lat / 1000
508                write_p99_99_lat = write_p99_99_lat / 1000
509                write_p99_999_lat = write_p99_999_lat / 1000
510
511        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
512                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
513                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
514                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
515
516    def parse_results(self, results_dir, csv_file):
517        files = os.listdir(results_dir)
518        fio_files = filter(lambda x: ".fio" in x, files)
519        json_files = [x for x in files if ".json" in x]
520
521        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
522                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
523                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
524                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
525
526        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
527                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
528
529        header_line = ",".join(["Name", *headers])
530        aggr_header_line = ",".join(["Name", *aggr_headers])
531
532        # Create empty results file
533        with open(os.path.join(results_dir, csv_file), "w") as fh:
534            fh.write(aggr_header_line + "\n")
535        rows = set()
536
537        for fio_config in fio_files:
538            self.log_print("Getting FIO stats for %s" % fio_config)
539            job_name, _ = os.path.splitext(fio_config)
540
541            # Look in the filename for rwmixread value. Function arguments do
542            # not have that information.
543            # TODO: Improve this function by directly using workload params instead
544            # of regexing through filenames.
545            if "read" in job_name:
546                rw_mixread = 1
547            elif "write" in job_name:
548                rw_mixread = 0
549            else:
550                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
551
552            # If "_CPU" exists in name - ignore it
553            # Initiators for the same job could have different num_cores parameter
554            job_name = re.sub(r"_\d+CPU", "", job_name)
555            job_result_files = [x for x in json_files if x.startswith(job_name)]
556            self.log_print("Matching result files for current fio config:")
557            for j in job_result_files:
558                self.log_print("\t %s" % j)
559
560            # There may have been more than 1 initiator used in test, need to check that
561            # Result files are created so that string after last "_" separator is server name
562            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
563            inits_avg_results = []
564            for i in inits_names:
565                self.log_print("\tGetting stats for initiator %s" % i)
566                # There may have been more than 1 test run for this job, calculate average results for initiator
567                i_results = [x for x in job_result_files if i in x]
568                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
569
570                separate_stats = []
571                for r in i_results:
572                    try:
573                        stats = self.read_json_stats(os.path.join(results_dir, r))
574                        separate_stats.append(stats)
575                        self.log_print(stats)
576                    except JSONDecodeError:
577                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
578
579                init_results = [sum(x) for x in zip(*separate_stats)]
580                init_results = [x / len(separate_stats) for x in init_results]
581                inits_avg_results.append(init_results)
582
583                self.log_print("\tAverage results for initiator %s" % i)
584                self.log_print(init_results)
585                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
586                    fh.write(header_line + "\n")
587                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
588
589            # Sum results of all initiators running this FIO job.
590            # Latency results are an average of latencies from accros all initiators.
591            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
592            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
593            for key in inits_avg_results:
594                if "lat" in key:
595                    inits_avg_results[key] /= len(inits_names)
596
597            # Aggregate separate read/write values into common labels
598            # Take rw_mixread into consideration for mixed read/write workloads.
599            aggregate_results = OrderedDict()
600            for h in aggr_headers:
601                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
602                if "lat" in h:
603                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
604                else:
605                    _ = read_stat + write_stat
606                aggregate_results[h] = "{0:.3f}".format(_)
607
608            rows.add(",".join([job_name, *aggregate_results.values()]))
609
610        # Save results to file
611        for row in rows:
612            with open(os.path.join(results_dir, csv_file), "a") as fh:
613                fh.write(row + "\n")
614        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
615
616    def measure_sar(self, results_dir, sar_file_prefix):
617        cpu_number = os.cpu_count()
618        sar_idle_sum = 0
619        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
620        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
621
622        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
623        time.sleep(self.sar_delay)
624        self.log_print("Starting SAR measurements")
625
626        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
627        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
628            for line in out.split("\n"):
629                if "Average" in line:
630                    if "CPU" in line:
631                        self.log_print("Summary CPU utilization from SAR:")
632                        self.log_print(line)
633                    elif "all" in line:
634                        self.log_print(line)
635                    else:
636                        sar_idle_sum += float(line.split()[7])
637            fh.write(out)
638        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
639
640        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
641            f.write("%0.2f" % sar_cpu_usage)
642
643    def ethtool_after_fio_ramp(self, fio_ramp_time):
644        time.sleep(fio_ramp_time//2)
645        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
646        for nic in nic_names:
647            self.log_print(nic)
648            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
649                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
650
651    def measure_pcm_memory(self, results_dir, pcm_file_name):
652        time.sleep(self.pcm_delay)
653        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
654        pcm_memory = subprocess.Popen(cmd)
655        time.sleep(self.pcm_count)
656        pcm_memory.terminate()
657
658    def measure_pcm(self, results_dir, pcm_file_name):
659        time.sleep(self.pcm_delay)
660        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
661        subprocess.run(cmd)
662        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
663        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
664        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
665        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
666        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
667
668    def measure_pcm_power(self, results_dir, pcm_power_file_name):
669        time.sleep(self.pcm_delay)
670        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
671        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
672            fh.write(out)
673
674    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
675        self.log_print("INFO: starting network bandwidth measure")
676        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
677                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
678
679    def measure_dpdk_memory(self, results_dir):
680        self.log_print("INFO: waiting to generate DPDK memory usage")
681        time.sleep(self.dpdk_wait_time)
682        self.log_print("INFO: generating DPDK memory usage")
683        rpc.env.env_dpdk_get_mem_stats
684        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
685
686    def sys_config(self):
687        self.log_print("====Kernel release:====")
688        self.log_print(os.uname().release)
689        self.log_print("====Kernel command line:====")
690        with open('/proc/cmdline') as f:
691            cmdline = f.readlines()
692            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
693        self.log_print("====sysctl conf:====")
694        with open('/etc/sysctl.conf') as f:
695            sysctl = f.readlines()
696            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
697        self.log_print("====Cpu power info:====")
698        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
699        self.log_print("====zcopy settings:====")
700        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
701        self.log_print("====Scheduler settings:====")
702        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
703
704
705class Initiator(Server):
706    def __init__(self, name, general_config, initiator_config):
707        super().__init__(name, general_config, initiator_config)
708
709        # Required fields
710        self.ip = initiator_config["ip"]
711        self.target_nic_ips = initiator_config["target_nic_ips"]
712
713        # Defaults
714        self.cpus_allowed = None
715        self.cpus_allowed_policy = "shared"
716        self.spdk_dir = "/tmp/spdk"
717        self.fio_bin = "/usr/src/fio/fio"
718        self.nvmecli_bin = "nvme"
719        self.cpu_frequency = None
720        self.subsystem_info_list = []
721
722        if "spdk_dir" in initiator_config:
723            self.spdk_dir = initiator_config["spdk_dir"]
724        if "fio_bin" in initiator_config:
725            self.fio_bin = initiator_config["fio_bin"]
726        if "nvmecli_bin" in initiator_config:
727            self.nvmecli_bin = initiator_config["nvmecli_bin"]
728        if "cpus_allowed" in initiator_config:
729            self.cpus_allowed = initiator_config["cpus_allowed"]
730        if "cpus_allowed_policy" in initiator_config:
731            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
732        if "cpu_frequency" in initiator_config:
733            self.cpu_frequency = initiator_config["cpu_frequency"]
734        if os.getenv('SPDK_WORKSPACE'):
735            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
736
737        self.ssh_connection = paramiko.SSHClient()
738        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
739        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
740        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
741        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
742        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
743
744        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
745            self.copy_spdk("/tmp/spdk.zip")
746        self.set_local_nic_info(self.set_local_nic_info_helper())
747        self.set_cpu_frequency()
748        self.configure_system()
749        if self.enable_adq:
750            self.configure_adq()
751        self.sys_config()
752
753    def set_local_nic_info_helper(self):
754        return json.loads(self.exec_cmd(["lshw", "-json"]))
755
756    def stop(self):
757        self.ssh_connection.close()
758
759    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
760        if change_dir:
761            cmd = ["cd", change_dir, ";", *cmd]
762
763        # In case one of the command elements contains whitespace and is not
764        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
765        # when sending to remote system.
766        for i, c in enumerate(cmd):
767            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
768                cmd[i] = '"%s"' % c
769        cmd = " ".join(cmd)
770
771        # Redirect stderr to stdout thanks using get_pty option if needed
772        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
773        out = stdout.read().decode(encoding="utf-8")
774
775        # Check the return code
776        rc = stdout.channel.recv_exit_status()
777        if rc:
778            raise CalledProcessError(int(rc), cmd, out)
779
780        return out
781
782    def put_file(self, local, remote_dest):
783        ftp = self.ssh_connection.open_sftp()
784        ftp.put(local, remote_dest)
785        ftp.close()
786
787    def get_file(self, remote, local_dest):
788        ftp = self.ssh_connection.open_sftp()
789        ftp.get(remote, local_dest)
790        ftp.close()
791
792    def copy_spdk(self, local_spdk_zip):
793        self.log_print("Copying SPDK sources to initiator %s" % self.name)
794        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
795        self.log_print("Copied sources zip from target")
796        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
797        self.log_print("Sources unpacked")
798
799    def copy_result_files(self, dest_dir):
800        self.log_print("Copying results")
801
802        if not os.path.exists(dest_dir):
803            os.mkdir(dest_dir)
804
805        # Get list of result files from initiator and copy them back to target
806        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
807
808        for file in file_list:
809            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
810                          os.path.join(dest_dir, file))
811        self.log_print("Done copying results")
812
813    def discover_subsystems(self, address_list, subsys_no):
814        num_nvmes = range(0, subsys_no)
815        nvme_discover_output = ""
816        for ip, subsys_no in itertools.product(address_list, num_nvmes):
817            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
818            nvme_discover_cmd = ["sudo",
819                                 "%s" % self.nvmecli_bin,
820                                 "discover", "-t", "%s" % self.transport,
821                                 "-s", "%s" % (4420 + subsys_no),
822                                 "-a", "%s" % ip]
823
824            try:
825                stdout = self.exec_cmd(nvme_discover_cmd)
826                if stdout:
827                    nvme_discover_output = nvme_discover_output + stdout
828            except CalledProcessError:
829                # Do nothing. In case of discovering remote subsystems of kernel target
830                # we expect "nvme discover" to fail a bunch of times because we basically
831                # scan ports.
832                pass
833
834        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
835                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
836                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
837                                nvme_discover_output)  # from nvme discovery output
838        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
839        subsystems = filter(lambda x: "discovery" not in x[1], subsystems)
840        subsystems = list(set(subsystems))
841        subsystems.sort(key=lambda x: x[1])
842        self.log_print("Found matching subsystems on target side:")
843        for s in subsystems:
844            self.log_print(s)
845        self.subsystem_info_list = subsystems
846
847    def gen_fio_filename_conf(self, *args, **kwargs):
848        # Logic implemented in SPDKInitiator and KernelInitiator classes
849        pass
850
851    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
852        fio_conf_template = """
853[global]
854ioengine={ioengine}
855{spdk_conf}
856thread=1
857group_reporting=1
858direct=1
859percentile_list=50:90:99:99.5:99.9:99.99:99.999
860
861norandommap=1
862rw={rw}
863rwmixread={rwmixread}
864bs={block_size}
865time_based=1
866ramp_time={ramp_time}
867runtime={run_time}
868rate_iops={rate_iops}
869"""
870        if "spdk" in self.mode:
871            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
872            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
873            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
874            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
875        else:
876            ioengine = self.ioengine
877            spdk_conf = ""
878            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
879                                 "|", "awk", "'{print $1}'"])
880            subsystems = [x for x in out.split("\n") if "nvme" in x]
881
882        if self.cpus_allowed is not None:
883            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
884            cpus_num = 0
885            cpus = self.cpus_allowed.split(",")
886            for cpu in cpus:
887                if "-" in cpu:
888                    a, b = cpu.split("-")
889                    a = int(a)
890                    b = int(b)
891                    cpus_num += len(range(a, b))
892                else:
893                    cpus_num += 1
894            self.num_cores = cpus_num
895            threads = range(0, self.num_cores)
896        elif hasattr(self, 'num_cores'):
897            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
898            threads = range(0, int(self.num_cores))
899        else:
900            self.num_cores = len(subsystems)
901            threads = range(0, len(subsystems))
902
903        if "spdk" in self.mode:
904            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
905        else:
906            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
907
908        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
909                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
910                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
911
912        # TODO: hipri disabled for now, as it causes fio errors:
913        # io_u error on file /dev/nvme2n1: Operation not supported
914        # See comment in KernelInitiator class, kernel_init_connect() function
915        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
916            fio_config = fio_config + """
917fixedbufs=1
918registerfiles=1
919#hipri=1
920"""
921        if num_jobs:
922            fio_config = fio_config + "numjobs=%s \n" % num_jobs
923        if self.cpus_allowed is not None:
924            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
925            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
926        fio_config = fio_config + filename_section
927
928        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
929        if hasattr(self, "num_cores"):
930            fio_config_filename += "_%sCPU" % self.num_cores
931        fio_config_filename += ".fio"
932
933        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
934        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
935        self.log_print("Created FIO Config:")
936        self.log_print(fio_config)
937
938        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
939
940    def set_cpu_frequency(self):
941        if self.cpu_frequency is not None:
942            try:
943                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
944                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
945                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
946            except Exception:
947                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
948                sys.exit()
949        else:
950            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
951
952    def run_fio(self, fio_config_file, run_num=None):
953        job_name, _ = os.path.splitext(fio_config_file)
954        self.log_print("Starting FIO run for job: %s" % job_name)
955        self.log_print("Using FIO: %s" % self.fio_bin)
956
957        if run_num:
958            for i in range(1, run_num + 1):
959                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
960                try:
961                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
962                                            "--output=%s" % output_filename, "--eta=never"], True)
963                    self.log_print(output)
964                except subprocess.CalledProcessError as e:
965                    self.log_print("ERROR: Fio process failed!")
966                    self.log_print(e.stdout)
967        else:
968            output_filename = job_name + "_" + self.name + ".json"
969            output = self.exec_cmd(["sudo", self.fio_bin,
970                                    fio_config_file, "--output-format=json",
971                                    "--output" % output_filename], True)
972            self.log_print(output)
973        self.log_print("FIO run finished. Results in: %s" % output_filename)
974
975    def sys_config(self):
976        self.log_print("====Kernel release:====")
977        self.log_print(self.exec_cmd(["uname", "-r"]))
978        self.log_print("====Kernel command line:====")
979        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
980        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
981        self.log_print("====sysctl conf:====")
982        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
983        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
984        self.log_print("====Cpu power info:====")
985        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
986
987
988class KernelTarget(Target):
989    def __init__(self, name, general_config, target_config):
990        super().__init__(name, general_config, target_config)
991        # Defaults
992        self.nvmet_bin = "nvmetcli"
993
994        if "nvmet_bin" in target_config:
995            self.nvmet_bin = target_config["nvmet_bin"]
996
997    def stop(self):
998        nvmet_command(self.nvmet_bin, "clear")
999
1000    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1001
1002        nvmet_cfg = {
1003            "ports": [],
1004            "hosts": [],
1005            "subsystems": [],
1006        }
1007
1008        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1009            port = str(4420 + bdev_num)
1010            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1011            serial = "SPDK00%s" % bdev_num
1012            bdev_name = nvme_list[bdev_num]
1013
1014            nvmet_cfg["subsystems"].append({
1015                "allowed_hosts": [],
1016                "attr": {
1017                    "allow_any_host": "1",
1018                    "serial": serial,
1019                    "version": "1.3"
1020                },
1021                "namespaces": [
1022                    {
1023                        "device": {
1024                            "path": bdev_name,
1025                            "uuid": "%s" % uuid.uuid4()
1026                        },
1027                        "enable": 1,
1028                        "nsid": port
1029                    }
1030                ],
1031                "nqn": nqn
1032            })
1033
1034            nvmet_cfg["ports"].append({
1035                "addr": {
1036                    "adrfam": "ipv4",
1037                    "traddr": ip,
1038                    "trsvcid": port,
1039                    "trtype": self.transport
1040                },
1041                "portid": bdev_num,
1042                "referrals": [],
1043                "subsystems": [nqn]
1044            })
1045
1046            self.subsystem_info_list.append([port, nqn, ip])
1047        self.subsys_no = len(self.subsystem_info_list)
1048
1049        with open("kernel.conf", "w") as fh:
1050            fh.write(json.dumps(nvmet_cfg, indent=2))
1051
1052    def tgt_start(self):
1053        self.log_print("Configuring kernel NVMeOF Target")
1054
1055        if self.null_block:
1056            print("Configuring with null block device.")
1057            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1058        else:
1059            print("Configuring with NVMe drives.")
1060            nvme_list = get_nvme_devices()
1061
1062        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1063        self.subsys_no = len(nvme_list)
1064
1065        nvmet_command(self.nvmet_bin, "clear")
1066        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1067
1068        if self.enable_adq:
1069            self.adq_configure_tc()
1070
1071        self.log_print("Done configuring kernel NVMeOF Target")
1072
1073
1074class SPDKTarget(Target):
1075    def __init__(self, name, general_config, target_config):
1076        super().__init__(name, general_config, target_config)
1077
1078        # Required fields
1079        self.core_mask = target_config["core_mask"]
1080        self.num_cores = self.get_num_cores(self.core_mask)
1081
1082        # Defaults
1083        self.dif_insert_strip = False
1084        self.null_block_dif_type = 0
1085        self.num_shared_buffers = 4096
1086        self.max_queue_depth = 128
1087        self.bpf_proc = None
1088        self.bpf_scripts = []
1089        self.enable_dsa = False
1090        self.scheduler_core_limit = None
1091
1092        if "num_shared_buffers" in target_config:
1093            self.num_shared_buffers = target_config["num_shared_buffers"]
1094        if "max_queue_depth" in target_config:
1095            self.max_queue_depth = target_config["max_queue_depth"]
1096        if "null_block_dif_type" in target_config:
1097            self.null_block_dif_type = target_config["null_block_dif_type"]
1098        if "dif_insert_strip" in target_config:
1099            self.dif_insert_strip = target_config["dif_insert_strip"]
1100        if "bpf_scripts" in target_config:
1101            self.bpf_scripts = target_config["bpf_scripts"]
1102        if "dsa_settings" in target_config:
1103            self.enable_dsa = target_config["dsa_settings"]
1104        if "scheduler_core_limit" in target_config:
1105            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1106
1107        self.log_print("====DSA settings:====")
1108        self.log_print("DSA enabled: %s" % (self.enable_dsa))
1109
1110    @staticmethod
1111    def get_num_cores(core_mask):
1112        if "0x" in core_mask:
1113            return bin(int(core_mask, 16)).count("1")
1114        else:
1115            num_cores = 0
1116            core_mask = core_mask.replace("[", "")
1117            core_mask = core_mask.replace("]", "")
1118            for i in core_mask.split(","):
1119                if "-" in i:
1120                    x, y = i.split("-")
1121                    num_cores += len(range(int(x), int(y))) + 1
1122                else:
1123                    num_cores += 1
1124            return num_cores
1125
1126    def spdk_tgt_configure(self):
1127        self.log_print("Configuring SPDK NVMeOF target via RPC")
1128
1129        if self.enable_adq:
1130            self.adq_configure_tc()
1131
1132        # Create transport layer
1133        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1134                                       num_shared_buffers=self.num_shared_buffers,
1135                                       max_queue_depth=self.max_queue_depth,
1136                                       dif_insert_or_strip=self.dif_insert_strip,
1137                                       sock_priority=self.adq_priority)
1138        self.log_print("SPDK NVMeOF transport layer:")
1139        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1140
1141        if self.null_block:
1142            self.spdk_tgt_add_nullblock(self.null_block)
1143            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1144        else:
1145            self.spdk_tgt_add_nvme_conf()
1146            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1147
1148        self.log_print("Done configuring SPDK NVMeOF Target")
1149
1150    def spdk_tgt_add_nullblock(self, null_block_count):
1151        md_size = 0
1152        block_size = 4096
1153        if self.null_block_dif_type != 0:
1154            md_size = 128
1155
1156        self.log_print("Adding null block bdevices to config via RPC")
1157        for i in range(null_block_count):
1158            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1159            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1160                                      dif_type=self.null_block_dif_type, md_size=md_size)
1161        self.log_print("SPDK Bdevs configuration:")
1162        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1163
1164    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1165        self.log_print("Adding NVMe bdevs to config via RPC")
1166
1167        bdfs = get_nvme_devices_bdf()
1168        bdfs = [b.replace(":", ".") for b in bdfs]
1169
1170        if req_num_disks:
1171            if req_num_disks > len(bdfs):
1172                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1173                sys.exit(1)
1174            else:
1175                bdfs = bdfs[0:req_num_disks]
1176
1177        for i, bdf in enumerate(bdfs):
1178            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1179
1180        self.log_print("SPDK Bdevs configuration:")
1181        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1182
1183    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1184        self.log_print("Adding subsystems to config")
1185        if not req_num_disks:
1186            req_num_disks = get_nvme_devices_count()
1187
1188        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1189            port = str(4420 + bdev_num)
1190            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1191            serial = "SPDK00%s" % bdev_num
1192            bdev_name = "Nvme%sn1" % bdev_num
1193
1194            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1195                                           allow_any_host=True, max_namespaces=8)
1196            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1197            for nqn_name in [nqn, "discovery"]:
1198                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1199                                                     nqn=nqn_name,
1200                                                     trtype=self.transport,
1201                                                     traddr=ip,
1202                                                     trsvcid=port,
1203                                                     adrfam="ipv4")
1204            self.subsystem_info_list.append([port, nqn, ip])
1205        self.subsys_no = len(self.subsystem_info_list)
1206
1207        self.log_print("SPDK NVMeOF subsystem configuration:")
1208        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1209
1210    def bpf_start(self):
1211        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1212        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1213        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1214        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1215
1216        with open(self.pid, "r") as fh:
1217            nvmf_pid = str(fh.readline())
1218
1219        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1220        self.log_print(cmd)
1221        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1222
1223    def tgt_start(self):
1224        if self.null_block:
1225            self.subsys_no = 1
1226        else:
1227            self.subsys_no = get_nvme_devices_count()
1228        self.log_print("Starting SPDK NVMeOF Target process")
1229        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1230        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1231        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1232
1233        with open(self.pid, "w") as fh:
1234            fh.write(str(proc.pid))
1235        self.nvmf_proc = proc
1236        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1237        self.log_print("Waiting for spdk to initialize...")
1238        while True:
1239            if os.path.exists("/var/tmp/spdk.sock"):
1240                break
1241            time.sleep(1)
1242        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1243
1244        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1245
1246        if self.enable_zcopy:
1247            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1248                                           enable_zerocopy_send_server=True)
1249            self.log_print("Target socket options:")
1250            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1251
1252        if self.enable_adq:
1253            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1254            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1255                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1256
1257        if self.enable_dsa:
1258            rpc.dsa.dsa_scan_accel_engine(self.client, config_kernel_mode=None)
1259            self.log_print("Target DSA accel engine enabled")
1260
1261        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1262        rpc.framework_start_init(self.client)
1263
1264        if self.bpf_scripts:
1265            self.bpf_start()
1266
1267        self.spdk_tgt_configure()
1268
1269    def stop(self):
1270        if self.bpf_proc:
1271            self.log_print("Stopping BPF Trace script")
1272            self.bpf_proc.terminate()
1273            self.bpf_proc.wait()
1274
1275        if hasattr(self, "nvmf_proc"):
1276            try:
1277                self.nvmf_proc.terminate()
1278                self.nvmf_proc.wait()
1279            except Exception as e:
1280                self.log_print(e)
1281                self.nvmf_proc.kill()
1282                self.nvmf_proc.communicate()
1283
1284
1285class KernelInitiator(Initiator):
1286    def __init__(self, name, general_config, initiator_config):
1287        super().__init__(name, general_config, initiator_config)
1288
1289        # Defaults
1290        self.extra_params = ""
1291        self.ioengine = "libaio"
1292
1293        if "extra_params" in initiator_config:
1294            self.extra_params = initiator_config["extra_params"]
1295
1296        if "kernel_engine" in initiator_config:
1297            self.ioengine = initiator_config["kernel_engine"]
1298            if "io_uring" in self.ioengine:
1299                self.extra_params = "--nr-poll-queues=8"
1300
1301    def get_connected_nvme_list(self):
1302        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1303        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1304                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1305        return nvme_list
1306
1307    def kernel_init_connect(self):
1308        self.log_print("Below connection attempts may result in error messages, this is expected!")
1309        for subsystem in self.subsystem_info_list:
1310            self.log_print("Trying to connect %s %s %s" % subsystem)
1311            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1312                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1313            time.sleep(2)
1314
1315        if "io_uring" in self.ioengine:
1316            self.log_print("Setting block layer settings for io_uring.")
1317
1318            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1319            #       apparently it's not possible for connected subsystems.
1320            #       Results in "error: Invalid argument"
1321            block_sysfs_settings = {
1322                "iostats": "0",
1323                "rq_affinity": "0",
1324                "nomerges": "2"
1325            }
1326
1327            for disk in self.get_connected_nvme_list():
1328                sysfs = os.path.join("/sys/block", disk, "queue")
1329                for k, v in block_sysfs_settings.items():
1330                    sysfs_opt_path = os.path.join(sysfs, k)
1331                    try:
1332                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1333                    except subprocess.CalledProcessError as e:
1334                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1335                    finally:
1336                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1337                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1338
1339    def kernel_init_disconnect(self):
1340        for subsystem in self.subsystem_info_list:
1341            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1342            time.sleep(1)
1343
1344    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1345        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1346
1347        filename_section = ""
1348        nvme_per_split = int(len(nvme_list) / len(threads))
1349        remainder = len(nvme_list) % len(threads)
1350        iterator = iter(nvme_list)
1351        result = []
1352        for i in range(len(threads)):
1353            result.append([])
1354            for _ in range(nvme_per_split):
1355                result[i].append(next(iterator))
1356                if remainder:
1357                    result[i].append(next(iterator))
1358                    remainder -= 1
1359        for i, r in enumerate(result):
1360            header = "[filename%s]" % i
1361            disks = "\n".join(["filename=%s" % x for x in r])
1362            job_section_qd = round((io_depth * len(r)) / num_jobs)
1363            if job_section_qd == 0:
1364                job_section_qd = 1
1365            iodepth = "iodepth=%s" % job_section_qd
1366            filename_section = "\n".join([filename_section, header, disks, iodepth])
1367
1368        return filename_section
1369
1370
1371class SPDKInitiator(Initiator):
1372    def __init__(self, name, general_config, initiator_config):
1373        super().__init__(name, general_config, initiator_config)
1374
1375        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1376            self.install_spdk()
1377
1378        # Required fields
1379        self.num_cores = initiator_config["num_cores"]
1380
1381        # Optional fields
1382        self.enable_data_digest = False
1383        if "enable_data_digest" in initiator_config:
1384            self.enable_data_digest = initiator_config["enable_data_digest"]
1385
1386    def install_spdk(self):
1387        self.log_print("Using fio binary %s" % self.fio_bin)
1388        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1389        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1390        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1391        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1392        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1393
1394        self.log_print("SPDK built")
1395        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1396
1397    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1398        bdev_cfg_section = {
1399            "subsystems": [
1400                {
1401                    "subsystem": "bdev",
1402                    "config": []
1403                }
1404            ]
1405        }
1406
1407        for i, subsys in enumerate(remote_subsystem_list):
1408            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1409            nvme_ctrl = {
1410                "method": "bdev_nvme_attach_controller",
1411                "params": {
1412                    "name": "Nvme{}".format(i),
1413                    "trtype": self.transport,
1414                    "traddr": sub_addr,
1415                    "trsvcid": sub_port,
1416                    "subnqn": sub_nqn,
1417                    "adrfam": "IPv4"
1418                }
1419            }
1420
1421            if self.enable_adq:
1422                nvme_ctrl["params"].update({"priority": "1"})
1423
1424            if self.enable_data_digest:
1425                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1426
1427            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1428
1429        return json.dumps(bdev_cfg_section, indent=2)
1430
1431    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1432        filename_section = ""
1433        if len(threads) >= len(subsystems):
1434            threads = range(0, len(subsystems))
1435        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1436        nvme_per_split = int(len(subsystems) / len(threads))
1437        remainder = len(subsystems) % len(threads)
1438        iterator = iter(filenames)
1439        result = []
1440        for i in range(len(threads)):
1441            result.append([])
1442            for _ in range(nvme_per_split):
1443                result[i].append(next(iterator))
1444            if remainder:
1445                result[i].append(next(iterator))
1446                remainder -= 1
1447        for i, r in enumerate(result):
1448            header = "[filename%s]" % i
1449            disks = "\n".join(["filename=%s" % x for x in r])
1450            job_section_qd = round((io_depth * len(r)) / num_jobs)
1451            if job_section_qd == 0:
1452                job_section_qd = 1
1453            iodepth = "iodepth=%s" % job_section_qd
1454            filename_section = "\n".join([filename_section, header, disks, iodepth])
1455
1456        return filename_section
1457
1458
1459if __name__ == "__main__":
1460    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1461    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1462
1463    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1464    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1465                        help='Configuration file.')
1466    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1467                        help='Results directory.')
1468    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1469                        help='CSV results filename.')
1470
1471    args = parser.parse_args()
1472
1473    print("Using config file: %s" % args.config)
1474    with open(args.config, "r") as config:
1475        data = json.load(config)
1476
1477    initiators = []
1478    fio_cases = []
1479
1480    general_config = data["general"]
1481    target_config = data["target"]
1482    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1483
1484    for k, v in data.items():
1485        if "target" in k:
1486            v.update({"results_dir": args.results})
1487            if data[k]["mode"] == "spdk":
1488                target_obj = SPDKTarget(k, data["general"], v)
1489            elif data[k]["mode"] == "kernel":
1490                target_obj = KernelTarget(k, data["general"], v)
1491        elif "initiator" in k:
1492            if data[k]["mode"] == "spdk":
1493                init_obj = SPDKInitiator(k, data["general"], v)
1494            elif data[k]["mode"] == "kernel":
1495                init_obj = KernelInitiator(k, data["general"], v)
1496            initiators.append(init_obj)
1497        elif "fio" in k:
1498            fio_workloads = itertools.product(data[k]["bs"],
1499                                              data[k]["qd"],
1500                                              data[k]["rw"])
1501
1502            fio_run_time = data[k]["run_time"]
1503            fio_ramp_time = data[k]["ramp_time"]
1504            fio_rw_mix_read = data[k]["rwmixread"]
1505            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1506            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1507
1508            fio_rate_iops = 0
1509            if "rate_iops" in data[k]:
1510                fio_rate_iops = data[k]["rate_iops"]
1511        else:
1512            continue
1513
1514    try:
1515        os.mkdir(args.results)
1516    except FileExistsError:
1517        pass
1518
1519    for i in initiators:
1520        target_obj.initiator_info.append(
1521            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1522        )
1523
1524    # TODO: This try block is definietly too large. Need to break this up into separate
1525    # logical blocks to reduce size.
1526    try:
1527        target_obj.tgt_start()
1528
1529        for i in initiators:
1530            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1531            if i.enable_adq:
1532                i.adq_configure_tc()
1533
1534        # Poor mans threading
1535        # Run FIO tests
1536        for block_size, io_depth, rw in fio_workloads:
1537            threads = []
1538            configs = []
1539            for i in initiators:
1540                if i.mode == "kernel":
1541                    i.kernel_init_connect()
1542
1543                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1544                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1545                configs.append(cfg)
1546
1547            for i, cfg in zip(initiators, configs):
1548                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1549                threads.append(t)
1550            if target_obj.enable_sar:
1551                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1552                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1553                threads.append(t)
1554
1555            if target_obj.enable_pcm:
1556                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1557
1558                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1559                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1560                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1561
1562                threads.append(pcm_cpu_t)
1563                threads.append(pcm_mem_t)
1564                threads.append(pcm_pow_t)
1565
1566            if target_obj.enable_bandwidth:
1567                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1568                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1569                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1570                threads.append(t)
1571
1572            if target_obj.enable_dpdk_memory:
1573                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1574                threads.append(t)
1575
1576            if target_obj.enable_adq:
1577                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1578                threads.append(ethtool_thread)
1579
1580            for t in threads:
1581                t.start()
1582            for t in threads:
1583                t.join()
1584
1585            for i in initiators:
1586                if i.mode == "kernel":
1587                    i.kernel_init_disconnect()
1588                i.copy_result_files(args.results)
1589
1590        target_obj.restore_governor()
1591        target_obj.restore_tuned()
1592        target_obj.restore_services()
1593        target_obj.restore_sysctl()
1594        if target_obj.enable_adq:
1595            target_obj.reload_driver("ice")
1596        for i in initiators:
1597            i.restore_governor()
1598            i.restore_tuned()
1599            i.restore_services()
1600            i.restore_sysctl()
1601            if i.enable_adq:
1602                i.reload_driver("ice")
1603        target_obj.parse_results(args.results, args.csv_filename)
1604    finally:
1605        for i in initiators:
1606            try:
1607                i.stop()
1608            except Exception as err:
1609                pass
1610        target_obj.stop()
1611