xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 895300d84030b370decaeaac0b3d6b7798227fe9)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    # pylint: disable=R0201
97    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
98        return ""
99
100    def configure_system(self):
101        self.configure_services()
102        self.configure_sysctl()
103        self.configure_tuned()
104        self.configure_cpu_governor()
105        self.configure_irq_affinity()
106
107    def configure_adq(self):
108        if self.mode == "kernel":
109            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
110            return
111        self.adq_load_modules()
112        self.adq_configure_nic()
113
114    def adq_load_modules(self):
115        self.log_print("Modprobing ADQ-related Linux modules...")
116        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
117        for module in adq_module_deps:
118            try:
119                self.exec_cmd(["sudo", "modprobe", module])
120                self.log_print("%s loaded!" % module)
121            except CalledProcessError as e:
122                self.log_print("ERROR: failed to load module %s" % module)
123                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
124
125    def adq_configure_tc(self):
126        self.log_print("Configuring ADQ Traffic classes and filters...")
127
128        if self.mode == "kernel":
129            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
130            return
131
132        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
133        num_queues_tc1 = self.num_cores
134        port_param = "dst_port" if isinstance(self, Target) else "src_port"
135        port = "4420"
136        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
137
138        for nic_ip in self.nic_ips:
139            nic_name = self.get_nic_name_by_ip(nic_ip)
140            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
141                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
142                                "queues", "%s@0" % num_queues_tc0,
143                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
144                                "hw", "1", "mode", "channel"]
145            self.log_print(" ".join(tc_qdisc_map_cmd))
146            self.exec_cmd(tc_qdisc_map_cmd)
147
148            time.sleep(5)
149            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
150            self.log_print(" ".join(tc_qdisc_ingress_cmd))
151            self.exec_cmd(tc_qdisc_ingress_cmd)
152
153            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
154                             "protocol", "ip", "ingress", "prio", "1", "flower",
155                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
156                             "skip_sw", "hw_tc", "1"]
157            self.log_print(" ".join(tc_filter_cmd))
158            self.exec_cmd(tc_filter_cmd)
159
160            # show tc configuration
161            self.log_print("Show tc configuration for %s NIC..." % nic_name)
162            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
163            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
164            self.log_print("%s" % tc_disk_out)
165            self.log_print("%s" % tc_filter_out)
166
167            # Ethtool coalesce settings must be applied after configuring traffic classes
168            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
169            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
170
171            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
172            xps_cmd = ["sudo", xps_script_path, nic_name]
173            self.log_print(xps_cmd)
174            self.exec_cmd(xps_cmd)
175
176    def reload_driver(self, driver):
177        try:
178            self.exec_cmd(["sudo", "rmmod", driver])
179            self.exec_cmd(["sudo", "modprobe", driver])
180        except CalledProcessError as e:
181            self.log_print("ERROR: failed to reload %s module!" % driver)
182            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
183
184    def adq_configure_nic(self):
185        self.log_print("Configuring NIC port settings for ADQ testing...")
186
187        # Reload the driver first, to make sure any previous settings are re-set.
188        self.reload_driver("ice")
189
190        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
191        for nic in nic_names:
192            self.log_print(nic)
193            try:
194                self.exec_cmd(["sudo", "ethtool", "-K", nic,
195                               "hw-tc-offload", "on"])  # Enable hardware TC offload
196                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
197                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
198                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
199                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
200                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
201                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
202                               "channel-pkt-inspect-optimize", "on"])
203            except CalledProcessError as e:
204                self.log_print("ERROR: failed to configure NIC port using ethtool!")
205                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
206                self.log_print("Please update your NIC driver and firmware versions and try again.")
207            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
208            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
209
210    def configure_services(self):
211        self.log_print("Configuring active services...")
212        svc_config = configparser.ConfigParser(strict=False)
213
214        # Below list is valid only for RHEL / Fedora systems and might not
215        # contain valid names for other distributions.
216        svc_target_state = {
217            "firewalld": "inactive",
218            "irqbalance": "inactive",
219            "lldpad.service": "inactive",
220            "lldpad.socket": "inactive"
221        }
222
223        for service in svc_target_state:
224            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
225            out = "\n".join(["[%s]" % service, out])
226            svc_config.read_string(out)
227
228            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
229                continue
230
231            service_state = svc_config[service]["ActiveState"]
232            self.log_print("Current state of %s service is %s" % (service, service_state))
233            self.svc_restore_dict.update({service: service_state})
234            if service_state != "inactive":
235                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
236                self.exec_cmd(["sudo", "systemctl", "stop", service])
237
238    def configure_sysctl(self):
239        self.log_print("Tuning sysctl settings...")
240
241        busy_read = 0
242        if self.enable_adq and self.mode == "spdk":
243            busy_read = 1
244
245        sysctl_opts = {
246            "net.core.busy_poll": 0,
247            "net.core.busy_read": busy_read,
248            "net.core.somaxconn": 4096,
249            "net.core.netdev_max_backlog": 8192,
250            "net.ipv4.tcp_max_syn_backlog": 16384,
251            "net.core.rmem_max": 268435456,
252            "net.core.wmem_max": 268435456,
253            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
254            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
255            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
256            "net.ipv4.route.flush": 1,
257            "vm.overcommit_memory": 1,
258        }
259
260        for opt, value in sysctl_opts.items():
261            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
262            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
263
264    def configure_tuned(self):
265        if not self.tuned_profile:
266            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
267            return
268
269        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
270        service = "tuned"
271        tuned_config = configparser.ConfigParser(strict=False)
272
273        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
274        out = "\n".join(["[%s]" % service, out])
275        tuned_config.read_string(out)
276        tuned_state = tuned_config[service]["ActiveState"]
277        self.svc_restore_dict.update({service: tuned_state})
278
279        if tuned_state != "inactive":
280            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
281            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
282
283            self.tuned_restore_dict = {
284                "profile": profile,
285                "mode": profile_mode
286            }
287
288        self.exec_cmd(["sudo", "systemctl", "start", service])
289        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
290        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
291
292    def configure_cpu_governor(self):
293        self.log_print("Setting CPU governor to performance...")
294
295        # This assumes that there is the same CPU scaling governor on each CPU
296        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
297        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
298
299    def configure_irq_affinity(self):
300        self.log_print("Setting NIC irq affinity for NICs...")
301
302        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
303        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
304        for nic in nic_names:
305            irq_cmd = ["sudo", irq_script_path, nic]
306            self.log_print(irq_cmd)
307            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
308
309    def restore_services(self):
310        self.log_print("Restoring services...")
311        for service, state in self.svc_restore_dict.items():
312            cmd = "stop" if state == "inactive" else "start"
313            self.exec_cmd(["sudo", "systemctl", cmd, service])
314
315    def restore_sysctl(self):
316        self.log_print("Restoring sysctl settings...")
317        for opt, value in self.sysctl_restore_dict.items():
318            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
319
320    def restore_tuned(self):
321        self.log_print("Restoring tuned-adm settings...")
322
323        if not self.tuned_restore_dict:
324            return
325
326        if self.tuned_restore_dict["mode"] == "auto":
327            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
328            self.log_print("Reverted tuned-adm to auto_profile.")
329        else:
330            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
331            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
332
333    def restore_governor(self):
334        self.log_print("Restoring CPU governor setting...")
335        if self.governor_restore:
336            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
337            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
338
339
340class Target(Server):
341    def __init__(self, name, general_config, target_config):
342        super().__init__(name, general_config, target_config)
343
344        # Defaults
345        self.enable_sar = False
346        self.sar_delay = 0
347        self.sar_interval = 0
348        self.sar_count = 0
349        self.enable_pcm = False
350        self.pcm_dir = ""
351        self.pcm_delay = 0
352        self.pcm_interval = 0
353        self.pcm_count = 0
354        self.enable_bandwidth = 0
355        self.bandwidth_count = 0
356        self.enable_dpdk_memory = False
357        self.dpdk_wait_time = 0
358        self.enable_zcopy = False
359        self.scheduler_name = "static"
360        self.null_block = 0
361        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
362        self.subsystem_info_list = []
363        self.initiator_info = []
364
365        if "null_block_devices" in target_config:
366            self.null_block = target_config["null_block_devices"]
367        if "sar_settings" in target_config:
368            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
369        if "pcm_settings" in target_config:
370            self.enable_pcm = True
371            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
372        if "enable_bandwidth" in target_config:
373            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
374        if "enable_dpdk_memory" in target_config:
375            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
376        if "scheduler_settings" in target_config:
377            self.scheduler_name = target_config["scheduler_settings"]
378        if "zcopy_settings" in target_config:
379            self.enable_zcopy = target_config["zcopy_settings"]
380        if "results_dir" in target_config:
381            self.results_dir = target_config["results_dir"]
382
383        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
384        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
385        self.set_local_nic_info(self.set_local_nic_info_helper())
386
387        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
388            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
389
390        self.configure_system()
391        if self.enable_adq:
392            self.configure_adq()
393        self.sys_config()
394
395    def set_local_nic_info_helper(self):
396        return json.loads(self.exec_cmd(["lshw", "-json"]))
397
398    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
399        stderr_opt = None
400        if stderr_redirect:
401            stderr_opt = subprocess.STDOUT
402        if change_dir:
403            old_cwd = os.getcwd()
404            os.chdir(change_dir)
405            self.log_print("Changing directory to %s" % change_dir)
406
407        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
408
409        if change_dir:
410            os.chdir(old_cwd)
411            self.log_print("Changing directory to %s" % old_cwd)
412        return out
413
414    def zip_spdk_sources(self, spdk_dir, dest_file):
415        self.log_print("Zipping SPDK source directory")
416        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
417        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
418            for file in files:
419                fh.write(os.path.relpath(os.path.join(root, file)))
420        fh.close()
421        self.log_print("Done zipping")
422
423    @staticmethod
424    def _chunks(input_list, chunks_no):
425        div, rem = divmod(len(input_list), chunks_no)
426        for i in range(chunks_no):
427            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
428            yield input_list[si:si + (div + 1 if i < rem else div)]
429
430    def spread_bdevs(self, req_disks):
431        # Spread available block devices indexes:
432        # - evenly across available initiator systems
433        # - evenly across available NIC interfaces for
434        #   each initiator
435        # Not NUMA aware.
436        ip_bdev_map = []
437        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
438
439        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
440            self.initiator_info[i]["bdev_range"] = init_chunk
441            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
442            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
443                for c in nic_chunk:
444                    ip_bdev_map.append((ip, c))
445        return ip_bdev_map
446
447    @staticmethod
448    def read_json_stats(file):
449        with open(file, "r") as json_data:
450            data = json.load(json_data)
451            job_pos = 0  # job_post = 0 because using aggregated results
452
453            # Check if latency is in nano or microseconds to choose correct dict key
454            def get_lat_unit(key_prefix, dict_section):
455                # key prefix - lat, clat or slat.
456                # dict section - portion of json containing latency bucket in question
457                # Return dict key to access the bucket and unit as string
458                for k, _ in dict_section.items():
459                    if k.startswith(key_prefix):
460                        return k, k.split("_")[1]
461
462            def get_clat_percentiles(clat_dict_leaf):
463                if "percentile" in clat_dict_leaf:
464                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
465                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
466                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
467                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
468
469                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
470                else:
471                    # Latest fio versions do not provide "percentile" results if no
472                    # measurements were done, so just return zeroes
473                    return [0, 0, 0, 0]
474
475            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
476            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
477            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
478            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
479            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
480            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
481            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
482            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
483                data["jobs"][job_pos]["read"][clat_key])
484
485            if "ns" in lat_unit:
486                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
487            if "ns" in clat_unit:
488                read_p99_lat = read_p99_lat / 1000
489                read_p99_9_lat = read_p99_9_lat / 1000
490                read_p99_99_lat = read_p99_99_lat / 1000
491                read_p99_999_lat = read_p99_999_lat / 1000
492
493            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
494            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
495            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
496            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
497            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
498            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
499            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
500            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
501                data["jobs"][job_pos]["write"][clat_key])
502
503            if "ns" in lat_unit:
504                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
505            if "ns" in clat_unit:
506                write_p99_lat = write_p99_lat / 1000
507                write_p99_9_lat = write_p99_9_lat / 1000
508                write_p99_99_lat = write_p99_99_lat / 1000
509                write_p99_999_lat = write_p99_999_lat / 1000
510
511        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
512                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
513                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
514                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
515
516    def parse_results(self, results_dir, csv_file):
517        files = os.listdir(results_dir)
518        fio_files = filter(lambda x: ".fio" in x, files)
519        json_files = [x for x in files if ".json" in x]
520
521        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
522                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
523                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
524                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
525
526        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
527                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
528
529        header_line = ",".join(["Name", *headers])
530        aggr_header_line = ",".join(["Name", *aggr_headers])
531
532        # Create empty results file
533        with open(os.path.join(results_dir, csv_file), "w") as fh:
534            fh.write(aggr_header_line + "\n")
535        rows = set()
536
537        for fio_config in fio_files:
538            self.log_print("Getting FIO stats for %s" % fio_config)
539            job_name, _ = os.path.splitext(fio_config)
540
541            # Look in the filename for rwmixread value. Function arguments do
542            # not have that information.
543            # TODO: Improve this function by directly using workload params instead
544            # of regexing through filenames.
545            if "read" in job_name:
546                rw_mixread = 1
547            elif "write" in job_name:
548                rw_mixread = 0
549            else:
550                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
551
552            # If "_CPU" exists in name - ignore it
553            # Initiators for the same job could have different num_cores parameter
554            job_name = re.sub(r"_\d+CPU", "", job_name)
555            job_result_files = [x for x in json_files if x.startswith(job_name)]
556            self.log_print("Matching result files for current fio config:")
557            for j in job_result_files:
558                self.log_print("\t %s" % j)
559
560            # There may have been more than 1 initiator used in test, need to check that
561            # Result files are created so that string after last "_" separator is server name
562            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
563            inits_avg_results = []
564            for i in inits_names:
565                self.log_print("\tGetting stats for initiator %s" % i)
566                # There may have been more than 1 test run for this job, calculate average results for initiator
567                i_results = [x for x in job_result_files if i in x]
568                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
569
570                separate_stats = []
571                for r in i_results:
572                    try:
573                        stats = self.read_json_stats(os.path.join(results_dir, r))
574                        separate_stats.append(stats)
575                        self.log_print(stats)
576                    except JSONDecodeError:
577                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
578
579                init_results = [sum(x) for x in zip(*separate_stats)]
580                init_results = [x / len(separate_stats) for x in init_results]
581                inits_avg_results.append(init_results)
582
583                self.log_print("\tAverage results for initiator %s" % i)
584                self.log_print(init_results)
585                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
586                    fh.write(header_line + "\n")
587                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
588
589            # Sum results of all initiators running this FIO job.
590            # Latency results are an average of latencies from accros all initiators.
591            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
592            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
593            for key in inits_avg_results:
594                if "lat" in key:
595                    inits_avg_results[key] /= len(inits_names)
596
597            # Aggregate separate read/write values into common labels
598            # Take rw_mixread into consideration for mixed read/write workloads.
599            aggregate_results = OrderedDict()
600            for h in aggr_headers:
601                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
602                if "lat" in h:
603                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
604                else:
605                    _ = read_stat + write_stat
606                aggregate_results[h] = "{0:.3f}".format(_)
607
608            rows.add(",".join([job_name, *aggregate_results.values()]))
609
610        # Save results to file
611        for row in rows:
612            with open(os.path.join(results_dir, csv_file), "a") as fh:
613                fh.write(row + "\n")
614        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
615
616    def measure_sar(self, results_dir, sar_file_prefix):
617        cpu_number = os.cpu_count()
618        sar_idle_sum = 0
619        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
620        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
621
622        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
623        time.sleep(self.sar_delay)
624        self.log_print("Starting SAR measurements")
625
626        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
627        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
628            for line in out.split("\n"):
629                if "Average" in line:
630                    if "CPU" in line:
631                        self.log_print("Summary CPU utilization from SAR:")
632                        self.log_print(line)
633                    elif "all" in line:
634                        self.log_print(line)
635                    else:
636                        sar_idle_sum += float(line.split()[7])
637            fh.write(out)
638        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
639
640        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
641            f.write("%0.2f" % sar_cpu_usage)
642
643    def ethtool_after_fio_ramp(self, fio_ramp_time):
644        time.sleep(fio_ramp_time//2)
645        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
646        for nic in nic_names:
647            self.log_print(nic)
648            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
649                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
650
651    def measure_pcm_memory(self, results_dir, pcm_file_name):
652        time.sleep(self.pcm_delay)
653        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
654        pcm_memory = subprocess.Popen(cmd)
655        time.sleep(self.pcm_count)
656        pcm_memory.terminate()
657
658    def measure_pcm(self, results_dir, pcm_file_name):
659        time.sleep(self.pcm_delay)
660        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
661        subprocess.run(cmd)
662        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
663        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
664        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
665        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
666        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
667
668    def measure_pcm_power(self, results_dir, pcm_power_file_name):
669        time.sleep(self.pcm_delay)
670        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
671        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
672            fh.write(out)
673
674    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
675        self.log_print("INFO: starting network bandwidth measure")
676        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
677                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
678
679    def measure_dpdk_memory(self, results_dir):
680        self.log_print("INFO: waiting to generate DPDK memory usage")
681        time.sleep(self.dpdk_wait_time)
682        self.log_print("INFO: generating DPDK memory usage")
683        rpc.env.env_dpdk_get_mem_stats
684        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
685
686    def sys_config(self):
687        self.log_print("====Kernel release:====")
688        self.log_print(os.uname().release)
689        self.log_print("====Kernel command line:====")
690        with open('/proc/cmdline') as f:
691            cmdline = f.readlines()
692            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
693        self.log_print("====sysctl conf:====")
694        with open('/etc/sysctl.conf') as f:
695            sysctl = f.readlines()
696            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
697        self.log_print("====Cpu power info:====")
698        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
699        self.log_print("====zcopy settings:====")
700        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
701        self.log_print("====Scheduler settings:====")
702        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
703
704
705class Initiator(Server):
706    def __init__(self, name, general_config, initiator_config):
707        super().__init__(name, general_config, initiator_config)
708
709        # Required fields
710        self.ip = initiator_config["ip"]
711        self.target_nic_ips = initiator_config["target_nic_ips"]
712
713        # Defaults
714        self.cpus_allowed = None
715        self.cpus_allowed_policy = "shared"
716        self.spdk_dir = "/tmp/spdk"
717        self.fio_bin = "/usr/src/fio/fio"
718        self.nvmecli_bin = "nvme"
719        self.cpu_frequency = None
720        self.subsystem_info_list = []
721
722        if "spdk_dir" in initiator_config:
723            self.spdk_dir = initiator_config["spdk_dir"]
724        if "fio_bin" in initiator_config:
725            self.fio_bin = initiator_config["fio_bin"]
726        if "nvmecli_bin" in initiator_config:
727            self.nvmecli_bin = initiator_config["nvmecli_bin"]
728        if "cpus_allowed" in initiator_config:
729            self.cpus_allowed = initiator_config["cpus_allowed"]
730        if "cpus_allowed_policy" in initiator_config:
731            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
732        if "cpu_frequency" in initiator_config:
733            self.cpu_frequency = initiator_config["cpu_frequency"]
734        if os.getenv('SPDK_WORKSPACE'):
735            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
736
737        self.ssh_connection = paramiko.SSHClient()
738        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
739        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
740        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
741        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
742        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
743
744        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
745            self.copy_spdk("/tmp/spdk.zip")
746        self.set_local_nic_info(self.set_local_nic_info_helper())
747        self.set_cpu_frequency()
748        self.configure_system()
749        if self.enable_adq:
750            self.configure_adq()
751        self.sys_config()
752
753    def set_local_nic_info_helper(self):
754        return json.loads(self.exec_cmd(["lshw", "-json"]))
755
756    def stop(self):
757        self.ssh_connection.close()
758
759    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
760        if change_dir:
761            cmd = ["cd", change_dir, ";", *cmd]
762
763        # In case one of the command elements contains whitespace and is not
764        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
765        # when sending to remote system.
766        for i, c in enumerate(cmd):
767            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
768                cmd[i] = '"%s"' % c
769        cmd = " ".join(cmd)
770
771        # Redirect stderr to stdout thanks using get_pty option if needed
772        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
773        out = stdout.read().decode(encoding="utf-8")
774
775        # Check the return code
776        rc = stdout.channel.recv_exit_status()
777        if rc:
778            raise CalledProcessError(int(rc), cmd, out)
779
780        return out
781
782    def put_file(self, local, remote_dest):
783        ftp = self.ssh_connection.open_sftp()
784        ftp.put(local, remote_dest)
785        ftp.close()
786
787    def get_file(self, remote, local_dest):
788        ftp = self.ssh_connection.open_sftp()
789        ftp.get(remote, local_dest)
790        ftp.close()
791
792    def copy_spdk(self, local_spdk_zip):
793        self.log_print("Copying SPDK sources to initiator %s" % self.name)
794        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
795        self.log_print("Copied sources zip from target")
796        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
797        self.log_print("Sources unpacked")
798
799    def copy_result_files(self, dest_dir):
800        self.log_print("Copying results")
801
802        if not os.path.exists(dest_dir):
803            os.mkdir(dest_dir)
804
805        # Get list of result files from initiator and copy them back to target
806        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
807
808        for file in file_list:
809            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
810                          os.path.join(dest_dir, file))
811        self.log_print("Done copying results")
812
813    def discover_subsystems(self, address_list, subsys_no):
814        num_nvmes = range(0, subsys_no)
815        nvme_discover_output = ""
816        for ip, subsys_no in itertools.product(address_list, num_nvmes):
817            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
818            nvme_discover_cmd = ["sudo",
819                                 "%s" % self.nvmecli_bin,
820                                 "discover", "-t", "%s" % self.transport,
821                                 "-s", "%s" % (4420 + subsys_no),
822                                 "-a", "%s" % ip]
823
824            try:
825                stdout = self.exec_cmd(nvme_discover_cmd)
826                if stdout:
827                    nvme_discover_output = nvme_discover_output + stdout
828            except CalledProcessError:
829                # Do nothing. In case of discovering remote subsystems of kernel target
830                # we expect "nvme discover" to fail a bunch of times because we basically
831                # scan ports.
832                pass
833
834        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
835                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
836                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
837                                nvme_discover_output)  # from nvme discovery output
838        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
839        subsystems = filter(lambda x: "discovery" not in x[1], subsystems)
840        subsystems = list(set(subsystems))
841        subsystems.sort(key=lambda x: x[1])
842        self.log_print("Found matching subsystems on target side:")
843        for s in subsystems:
844            self.log_print(s)
845        self.subsystem_info_list = subsystems
846
847    def gen_fio_filename_conf(self, *args, **kwargs):
848        # Logic implemented in SPDKInitiator and KernelInitiator classes
849        pass
850
851    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
852        fio_conf_template = """
853[global]
854ioengine={ioengine}
855{spdk_conf}
856thread=1
857group_reporting=1
858direct=1
859percentile_list=50:90:99:99.5:99.9:99.99:99.999
860
861norandommap=1
862rw={rw}
863rwmixread={rwmixread}
864bs={block_size}
865time_based=1
866ramp_time={ramp_time}
867runtime={run_time}
868rate_iops={rate_iops}
869"""
870        if "spdk" in self.mode:
871            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
872            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
873            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
874            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
875        else:
876            ioengine = self.ioengine
877            spdk_conf = ""
878            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
879                                 "|", "awk", "'{print $1}'"])
880            subsystems = [x for x in out.split("\n") if "nvme" in x]
881
882        if self.cpus_allowed is not None:
883            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
884            cpus_num = 0
885            cpus = self.cpus_allowed.split(",")
886            for cpu in cpus:
887                if "-" in cpu:
888                    a, b = cpu.split("-")
889                    a = int(a)
890                    b = int(b)
891                    cpus_num += len(range(a, b))
892                else:
893                    cpus_num += 1
894            self.num_cores = cpus_num
895            threads = range(0, self.num_cores)
896        elif hasattr(self, 'num_cores'):
897            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
898            threads = range(0, int(self.num_cores))
899        else:
900            self.num_cores = len(subsystems)
901            threads = range(0, len(subsystems))
902
903        if "spdk" in self.mode:
904            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
905        else:
906            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
907
908        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
909                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
910                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
911
912        # TODO: hipri disabled for now, as it causes fio errors:
913        # io_u error on file /dev/nvme2n1: Operation not supported
914        # See comment in KernelInitiator class, kernel_init_connect() function
915        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
916            fio_config = fio_config + """
917fixedbufs=1
918registerfiles=1
919#hipri=1
920"""
921        if num_jobs:
922            fio_config = fio_config + "numjobs=%s \n" % num_jobs
923        if self.cpus_allowed is not None:
924            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
925            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
926        fio_config = fio_config + filename_section
927
928        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
929        if hasattr(self, "num_cores"):
930            fio_config_filename += "_%sCPU" % self.num_cores
931        fio_config_filename += ".fio"
932
933        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
934        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
935        self.log_print("Created FIO Config:")
936        self.log_print(fio_config)
937
938        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
939
940    def set_cpu_frequency(self):
941        if self.cpu_frequency is not None:
942            try:
943                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
944                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
945                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
946            except Exception:
947                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
948                sys.exit()
949        else:
950            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
951
952    def run_fio(self, fio_config_file, run_num=None):
953        job_name, _ = os.path.splitext(fio_config_file)
954        self.log_print("Starting FIO run for job: %s" % job_name)
955        self.log_print("Using FIO: %s" % self.fio_bin)
956
957        if run_num:
958            for i in range(1, run_num + 1):
959                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
960                try:
961                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
962                                            "--output=%s" % output_filename, "--eta=never"], True)
963                    self.log_print(output)
964                except subprocess.CalledProcessError as e:
965                    self.log_print("ERROR: Fio process failed!")
966                    self.log_print(e.stdout)
967        else:
968            output_filename = job_name + "_" + self.name + ".json"
969            output = self.exec_cmd(["sudo", self.fio_bin,
970                                    fio_config_file, "--output-format=json",
971                                    "--output" % output_filename], True)
972            self.log_print(output)
973        self.log_print("FIO run finished. Results in: %s" % output_filename)
974
975    def sys_config(self):
976        self.log_print("====Kernel release:====")
977        self.log_print(self.exec_cmd(["uname", "-r"]))
978        self.log_print("====Kernel command line:====")
979        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
980        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
981        self.log_print("====sysctl conf:====")
982        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
983        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
984        self.log_print("====Cpu power info:====")
985        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
986
987
988class KernelTarget(Target):
989    def __init__(self, name, general_config, target_config):
990        super().__init__(name, general_config, target_config)
991        # Defaults
992        self.nvmet_bin = "nvmetcli"
993
994        if "nvmet_bin" in target_config:
995            self.nvmet_bin = target_config["nvmet_bin"]
996
997    def stop(self):
998        nvmet_command(self.nvmet_bin, "clear")
999
1000    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1001
1002        nvmet_cfg = {
1003            "ports": [],
1004            "hosts": [],
1005            "subsystems": [],
1006        }
1007
1008        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1009            port = str(4420 + bdev_num)
1010            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1011            serial = "SPDK00%s" % bdev_num
1012            bdev_name = nvme_list[bdev_num]
1013
1014            nvmet_cfg["subsystems"].append({
1015                "allowed_hosts": [],
1016                "attr": {
1017                    "allow_any_host": "1",
1018                    "serial": serial,
1019                    "version": "1.3"
1020                },
1021                "namespaces": [
1022                    {
1023                        "device": {
1024                            "path": bdev_name,
1025                            "uuid": "%s" % uuid.uuid4()
1026                        },
1027                        "enable": 1,
1028                        "nsid": port
1029                    }
1030                ],
1031                "nqn": nqn
1032            })
1033
1034            nvmet_cfg["ports"].append({
1035                "addr": {
1036                    "adrfam": "ipv4",
1037                    "traddr": ip,
1038                    "trsvcid": port,
1039                    "trtype": self.transport
1040                },
1041                "portid": bdev_num,
1042                "referrals": [],
1043                "subsystems": [nqn]
1044            })
1045
1046            self.subsystem_info_list.append([port, nqn, ip])
1047        self.subsys_no = len(self.subsystem_info_list)
1048
1049        with open("kernel.conf", "w") as fh:
1050            fh.write(json.dumps(nvmet_cfg, indent=2))
1051
1052    def tgt_start(self):
1053        self.log_print("Configuring kernel NVMeOF Target")
1054
1055        if self.null_block:
1056            print("Configuring with null block device.")
1057            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1058        else:
1059            print("Configuring with NVMe drives.")
1060            nvme_list = get_nvme_devices()
1061
1062        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1063        self.subsys_no = len(nvme_list)
1064
1065        nvmet_command(self.nvmet_bin, "clear")
1066        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1067
1068        if self.enable_adq:
1069            self.adq_configure_tc()
1070
1071        self.log_print("Done configuring kernel NVMeOF Target")
1072
1073
1074class SPDKTarget(Target):
1075    def __init__(self, name, general_config, target_config):
1076        super().__init__(name, general_config, target_config)
1077
1078        # Required fields
1079        self.core_mask = target_config["core_mask"]
1080        self.num_cores = self.get_num_cores(self.core_mask)
1081
1082        # Defaults
1083        self.dif_insert_strip = False
1084        self.null_block_dif_type = 0
1085        self.num_shared_buffers = 4096
1086        self.max_queue_depth = 128
1087        self.bpf_proc = None
1088        self.bpf_scripts = []
1089        self.enable_dsa = False
1090        self.scheduler_core_limit = None
1091
1092        if "num_shared_buffers" in target_config:
1093            self.num_shared_buffers = target_config["num_shared_buffers"]
1094        if "max_queue_depth" in target_config:
1095            self.max_queue_depth = target_config["max_queue_depth"]
1096        if "null_block_dif_type" in target_config:
1097            self.null_block_dif_type = target_config["null_block_dif_type"]
1098        if "dif_insert_strip" in target_config:
1099            self.dif_insert_strip = target_config["dif_insert_strip"]
1100        if "bpf_scripts" in target_config:
1101            self.bpf_scripts = target_config["bpf_scripts"]
1102        if "dsa_settings" in target_config:
1103            self.enable_dsa = target_config["dsa_settings"]
1104        if "scheduler_core_limit" in target_config:
1105            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1106
1107        self.log_print("====DSA settings:====")
1108        self.log_print("DSA enabled: %s" % (self.enable_dsa))
1109
1110    @staticmethod
1111    def get_num_cores(core_mask):
1112        if "0x" in core_mask:
1113            return bin(int(core_mask, 16)).count("1")
1114        else:
1115            num_cores = 0
1116            core_mask = core_mask.replace("[", "")
1117            core_mask = core_mask.replace("]", "")
1118            for i in core_mask.split(","):
1119                if "-" in i:
1120                    x, y = i.split("-")
1121                    num_cores += len(range(int(x), int(y))) + 1
1122                else:
1123                    num_cores += 1
1124            return num_cores
1125
1126    def spdk_tgt_configure(self):
1127        self.log_print("Configuring SPDK NVMeOF target via RPC")
1128
1129        if self.enable_adq:
1130            self.adq_configure_tc()
1131
1132        # Create transport layer
1133        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1134                                       num_shared_buffers=self.num_shared_buffers,
1135                                       max_queue_depth=self.max_queue_depth,
1136                                       dif_insert_or_strip=self.dif_insert_strip,
1137                                       sock_priority=self.adq_priority)
1138        self.log_print("SPDK NVMeOF transport layer:")
1139        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1140
1141        if self.null_block:
1142            self.spdk_tgt_add_nullblock(self.null_block)
1143            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1144        else:
1145            self.spdk_tgt_add_nvme_conf()
1146            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1147
1148        self.log_print("Done configuring SPDK NVMeOF Target")
1149
1150    def spdk_tgt_add_nullblock(self, null_block_count):
1151        md_size = 0
1152        block_size = 4096
1153        if self.null_block_dif_type != 0:
1154            md_size = 128
1155
1156        self.log_print("Adding null block bdevices to config via RPC")
1157        for i in range(null_block_count):
1158            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1159            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1160                                      dif_type=self.null_block_dif_type, md_size=md_size)
1161        self.log_print("SPDK Bdevs configuration:")
1162        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1163
1164    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1165        self.log_print("Adding NVMe bdevs to config via RPC")
1166
1167        bdfs = get_nvme_devices_bdf()
1168        bdfs = [b.replace(":", ".") for b in bdfs]
1169
1170        if req_num_disks:
1171            if req_num_disks > len(bdfs):
1172                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1173                sys.exit(1)
1174            else:
1175                bdfs = bdfs[0:req_num_disks]
1176
1177        for i, bdf in enumerate(bdfs):
1178            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1179
1180        self.log_print("SPDK Bdevs configuration:")
1181        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1182
1183    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1184        self.log_print("Adding subsystems to config")
1185        if not req_num_disks:
1186            req_num_disks = get_nvme_devices_count()
1187
1188        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1189            port = str(4420 + bdev_num)
1190            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1191            serial = "SPDK00%s" % bdev_num
1192            bdev_name = "Nvme%sn1" % bdev_num
1193
1194            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1195                                           allow_any_host=True, max_namespaces=8)
1196            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1197            for nqn_name in [nqn, "discovery"]:
1198                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1199                                                     nqn=nqn_name,
1200                                                     trtype=self.transport,
1201                                                     traddr=ip,
1202                                                     trsvcid=port,
1203                                                     adrfam="ipv4")
1204            self.subsystem_info_list.append([port, nqn, ip])
1205        self.subsys_no = len(self.subsystem_info_list)
1206
1207        self.log_print("SPDK NVMeOF subsystem configuration:")
1208        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1209
1210    def bpf_start(self):
1211        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1212        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1213        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1214        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1215
1216        with open(self.pid, "r") as fh:
1217            nvmf_pid = str(fh.readline())
1218
1219        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1220        self.log_print(cmd)
1221        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1222
1223    def tgt_start(self):
1224        if self.null_block:
1225            self.subsys_no = 1
1226        else:
1227            self.subsys_no = get_nvme_devices_count()
1228        self.log_print("Starting SPDK NVMeOF Target process")
1229        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1230        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1231        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1232
1233        with open(self.pid, "w") as fh:
1234            fh.write(str(proc.pid))
1235        self.nvmf_proc = proc
1236        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1237        self.log_print("Waiting for spdk to initialize...")
1238        while True:
1239            if os.path.exists("/var/tmp/spdk.sock"):
1240                break
1241            time.sleep(1)
1242        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1243
1244        if self.enable_zcopy:
1245            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1246                                           enable_zerocopy_send_server=True)
1247            self.log_print("Target socket options:")
1248            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1249
1250        if self.enable_adq:
1251            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1252            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1253                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1254
1255        if self.enable_dsa:
1256            rpc.dsa.dsa_scan_accel_engine(self.client, config_kernel_mode=None)
1257            self.log_print("Target DSA accel engine enabled")
1258
1259        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1260        rpc.framework_start_init(self.client)
1261
1262        if self.bpf_scripts:
1263            self.bpf_start()
1264
1265        self.spdk_tgt_configure()
1266
1267    def stop(self):
1268        if self.bpf_proc:
1269            self.log_print("Stopping BPF Trace script")
1270            self.bpf_proc.terminate()
1271            self.bpf_proc.wait()
1272
1273        if hasattr(self, "nvmf_proc"):
1274            try:
1275                self.nvmf_proc.terminate()
1276                self.nvmf_proc.wait()
1277            except Exception as e:
1278                self.log_print(e)
1279                self.nvmf_proc.kill()
1280                self.nvmf_proc.communicate()
1281
1282
1283class KernelInitiator(Initiator):
1284    def __init__(self, name, general_config, initiator_config):
1285        super().__init__(name, general_config, initiator_config)
1286
1287        # Defaults
1288        self.extra_params = ""
1289        self.ioengine = "libaio"
1290
1291        if "extra_params" in initiator_config:
1292            self.extra_params = initiator_config["extra_params"]
1293
1294        if "kernel_engine" in initiator_config:
1295            self.ioengine = initiator_config["kernel_engine"]
1296            if "io_uring" in self.ioengine:
1297                self.extra_params = "--nr-poll-queues=8"
1298
1299    def get_connected_nvme_list(self):
1300        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1301        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1302                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1303        return nvme_list
1304
1305    def kernel_init_connect(self):
1306        self.log_print("Below connection attempts may result in error messages, this is expected!")
1307        for subsystem in self.subsystem_info_list:
1308            self.log_print("Trying to connect %s %s %s" % subsystem)
1309            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1310                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1311            time.sleep(2)
1312
1313        if "io_uring" in self.ioengine:
1314            self.log_print("Setting block layer settings for io_uring.")
1315
1316            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1317            #       apparently it's not possible for connected subsystems.
1318            #       Results in "error: Invalid argument"
1319            block_sysfs_settings = {
1320                "iostats": "0",
1321                "rq_affinity": "0",
1322                "nomerges": "2"
1323            }
1324
1325            for disk in self.get_connected_nvme_list():
1326                sysfs = os.path.join("/sys/block", disk, "queue")
1327                for k, v in block_sysfs_settings.items():
1328                    sysfs_opt_path = os.path.join(sysfs, k)
1329                    try:
1330                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1331                    except subprocess.CalledProcessError as e:
1332                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1333                    finally:
1334                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1335                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1336
1337    def kernel_init_disconnect(self):
1338        for subsystem in self.subsystem_info_list:
1339            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1340            time.sleep(1)
1341
1342    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1343        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1344
1345        filename_section = ""
1346        nvme_per_split = int(len(nvme_list) / len(threads))
1347        remainder = len(nvme_list) % len(threads)
1348        iterator = iter(nvme_list)
1349        result = []
1350        for i in range(len(threads)):
1351            result.append([])
1352            for _ in range(nvme_per_split):
1353                result[i].append(next(iterator))
1354                if remainder:
1355                    result[i].append(next(iterator))
1356                    remainder -= 1
1357        for i, r in enumerate(result):
1358            header = "[filename%s]" % i
1359            disks = "\n".join(["filename=%s" % x for x in r])
1360            job_section_qd = round((io_depth * len(r)) / num_jobs)
1361            if job_section_qd == 0:
1362                job_section_qd = 1
1363            iodepth = "iodepth=%s" % job_section_qd
1364            filename_section = "\n".join([filename_section, header, disks, iodepth])
1365
1366        return filename_section
1367
1368
1369class SPDKInitiator(Initiator):
1370    def __init__(self, name, general_config, initiator_config):
1371        super().__init__(name, general_config, initiator_config)
1372
1373        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1374            self.install_spdk()
1375
1376        # Required fields
1377        self.num_cores = initiator_config["num_cores"]
1378
1379        # Optional fields
1380        self.enable_data_digest = False
1381        if "enable_data_digest" in initiator_config:
1382            self.enable_data_digest = initiator_config["enable_data_digest"]
1383
1384    def install_spdk(self):
1385        self.log_print("Using fio binary %s" % self.fio_bin)
1386        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1387        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1388        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1389        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1390        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1391
1392        self.log_print("SPDK built")
1393        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1394
1395    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1396        bdev_cfg_section = {
1397            "subsystems": [
1398                {
1399                    "subsystem": "bdev",
1400                    "config": []
1401                }
1402            ]
1403        }
1404
1405        for i, subsys in enumerate(remote_subsystem_list):
1406            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1407            nvme_ctrl = {
1408                "method": "bdev_nvme_attach_controller",
1409                "params": {
1410                    "name": "Nvme{}".format(i),
1411                    "trtype": self.transport,
1412                    "traddr": sub_addr,
1413                    "trsvcid": sub_port,
1414                    "subnqn": sub_nqn,
1415                    "adrfam": "IPv4"
1416                }
1417            }
1418
1419            if self.enable_adq:
1420                nvme_ctrl["params"].update({"priority": "1"})
1421
1422            if self.enable_data_digest:
1423                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1424
1425            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1426
1427        return json.dumps(bdev_cfg_section, indent=2)
1428
1429    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1430        filename_section = ""
1431        if len(threads) >= len(subsystems):
1432            threads = range(0, len(subsystems))
1433        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1434        nvme_per_split = int(len(subsystems) / len(threads))
1435        remainder = len(subsystems) % len(threads)
1436        iterator = iter(filenames)
1437        result = []
1438        for i in range(len(threads)):
1439            result.append([])
1440            for _ in range(nvme_per_split):
1441                result[i].append(next(iterator))
1442            if remainder:
1443                result[i].append(next(iterator))
1444                remainder -= 1
1445        for i, r in enumerate(result):
1446            header = "[filename%s]" % i
1447            disks = "\n".join(["filename=%s" % x for x in r])
1448            job_section_qd = round((io_depth * len(r)) / num_jobs)
1449            if job_section_qd == 0:
1450                job_section_qd = 1
1451            iodepth = "iodepth=%s" % job_section_qd
1452            filename_section = "\n".join([filename_section, header, disks, iodepth])
1453
1454        return filename_section
1455
1456
1457if __name__ == "__main__":
1458    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1459    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1460
1461    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1462    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1463                        help='Configuration file.')
1464    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1465                        help='Results directory.')
1466    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1467                        help='CSV results filename.')
1468
1469    args = parser.parse_args()
1470
1471    print("Using config file: %s" % args.config)
1472    with open(args.config, "r") as config:
1473        data = json.load(config)
1474
1475    initiators = []
1476    fio_cases = []
1477
1478    general_config = data["general"]
1479    target_config = data["target"]
1480    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1481
1482    for k, v in data.items():
1483        if "target" in k:
1484            v.update({"results_dir": args.results})
1485            if data[k]["mode"] == "spdk":
1486                target_obj = SPDKTarget(k, data["general"], v)
1487            elif data[k]["mode"] == "kernel":
1488                target_obj = KernelTarget(k, data["general"], v)
1489        elif "initiator" in k:
1490            if data[k]["mode"] == "spdk":
1491                init_obj = SPDKInitiator(k, data["general"], v)
1492            elif data[k]["mode"] == "kernel":
1493                init_obj = KernelInitiator(k, data["general"], v)
1494            initiators.append(init_obj)
1495        elif "fio" in k:
1496            fio_workloads = itertools.product(data[k]["bs"],
1497                                              data[k]["qd"],
1498                                              data[k]["rw"])
1499
1500            fio_run_time = data[k]["run_time"]
1501            fio_ramp_time = data[k]["ramp_time"]
1502            fio_rw_mix_read = data[k]["rwmixread"]
1503            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1504            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1505
1506            fio_rate_iops = 0
1507            if "rate_iops" in data[k]:
1508                fio_rate_iops = data[k]["rate_iops"]
1509        else:
1510            continue
1511
1512    try:
1513        os.mkdir(args.results)
1514    except FileExistsError:
1515        pass
1516
1517    for i in initiators:
1518        target_obj.initiator_info.append(
1519            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1520        )
1521
1522    # TODO: This try block is definietly too large. Need to break this up into separate
1523    # logical blocks to reduce size.
1524    try:
1525        target_obj.tgt_start()
1526
1527        for i in initiators:
1528            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1529            if i.enable_adq:
1530                i.adq_configure_tc()
1531
1532        # Poor mans threading
1533        # Run FIO tests
1534        for block_size, io_depth, rw in fio_workloads:
1535            threads = []
1536            configs = []
1537            for i in initiators:
1538                if i.mode == "kernel":
1539                    i.kernel_init_connect()
1540
1541                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1542                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1543                configs.append(cfg)
1544
1545            for i, cfg in zip(initiators, configs):
1546                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1547                threads.append(t)
1548            if target_obj.enable_sar:
1549                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1550                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1551                threads.append(t)
1552
1553            if target_obj.enable_pcm:
1554                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1555
1556                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1557                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1558                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1559
1560                threads.append(pcm_cpu_t)
1561                threads.append(pcm_mem_t)
1562                threads.append(pcm_pow_t)
1563
1564            if target_obj.enable_bandwidth:
1565                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1566                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1567                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1568                threads.append(t)
1569
1570            if target_obj.enable_dpdk_memory:
1571                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1572                threads.append(t)
1573
1574            if target_obj.enable_adq:
1575                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1576                threads.append(ethtool_thread)
1577
1578            for t in threads:
1579                t.start()
1580            for t in threads:
1581                t.join()
1582
1583            for i in initiators:
1584                if i.mode == "kernel":
1585                    i.kernel_init_disconnect()
1586                i.copy_result_files(args.results)
1587
1588        target_obj.restore_governor()
1589        target_obj.restore_tuned()
1590        target_obj.restore_services()
1591        target_obj.restore_sysctl()
1592        if target_obj.enable_adq:
1593            target_obj.reload_driver("ice")
1594        for i in initiators:
1595            i.restore_governor()
1596            i.restore_tuned()
1597            i.restore_services()
1598            i.restore_sysctl()
1599            if i.enable_adq:
1600                i.reload_driver("ice")
1601        target_obj.parse_results(args.results, args.csv_filename)
1602    finally:
1603        for i in initiators:
1604            try:
1605                i.stop()
1606            except Exception as err:
1607                pass
1608        target_obj.stop()
1609