xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 7506a7aa53d239f533af3bc768f0d2af55e735fe)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    # pylint: disable=R0201
97    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
98        return ""
99
100    def configure_system(self):
101        self.configure_services()
102        self.configure_sysctl()
103        self.configure_tuned()
104        self.configure_cpu_governor()
105        self.configure_irq_affinity()
106
107    def configure_adq(self):
108        if self.mode == "kernel":
109            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
110            return
111        self.adq_load_modules()
112        self.adq_configure_nic()
113
114    def adq_load_modules(self):
115        self.log_print("Modprobing ADQ-related Linux modules...")
116        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
117        for module in adq_module_deps:
118            try:
119                self.exec_cmd(["sudo", "modprobe", module])
120                self.log_print("%s loaded!" % module)
121            except CalledProcessError as e:
122                self.log_print("ERROR: failed to load module %s" % module)
123                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
124
125    def adq_configure_tc(self):
126        self.log_print("Configuring ADQ Traffic classes and filters...")
127
128        if self.mode == "kernel":
129            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
130            return
131
132        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
133        num_queues_tc1 = self.num_cores
134        port_param = "dst_port" if isinstance(self, Target) else "src_port"
135        port = "4420"
136        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
137
138        for nic_ip in self.nic_ips:
139            nic_name = self.get_nic_name_by_ip(nic_ip)
140            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
141                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
142                                "queues", "%s@0" % num_queues_tc0,
143                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
144                                "hw", "1", "mode", "channel"]
145            self.log_print(" ".join(tc_qdisc_map_cmd))
146            self.exec_cmd(tc_qdisc_map_cmd)
147
148            time.sleep(5)
149            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
150            self.log_print(" ".join(tc_qdisc_ingress_cmd))
151            self.exec_cmd(tc_qdisc_ingress_cmd)
152
153            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
154                             "protocol", "ip", "ingress", "prio", "1", "flower",
155                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
156                             "skip_sw", "hw_tc", "1"]
157            self.log_print(" ".join(tc_filter_cmd))
158            self.exec_cmd(tc_filter_cmd)
159
160            # show tc configuration
161            self.log_print("Show tc configuration for %s NIC..." % nic_name)
162            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
163            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
164            self.log_print("%s" % tc_disk_out)
165            self.log_print("%s" % tc_filter_out)
166
167            # Ethtool coalesce settings must be applied after configuring traffic classes
168            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
169            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
170
171            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
172            xps_cmd = ["sudo", xps_script_path, nic_name]
173            self.log_print(xps_cmd)
174            self.exec_cmd(xps_cmd)
175
176    def reload_driver(self, driver):
177        try:
178            self.exec_cmd(["sudo", "rmmod", driver])
179            self.exec_cmd(["sudo", "modprobe", driver])
180        except CalledProcessError as e:
181            self.log_print("ERROR: failed to reload %s module!" % driver)
182            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
183
184    def adq_configure_nic(self):
185        self.log_print("Configuring NIC port settings for ADQ testing...")
186
187        # Reload the driver first, to make sure any previous settings are re-set.
188        self.reload_driver("ice")
189
190        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
191        for nic in nic_names:
192            self.log_print(nic)
193            try:
194                self.exec_cmd(["sudo", "ethtool", "-K", nic,
195                               "hw-tc-offload", "on"])  # Enable hardware TC offload
196                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
197                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
198                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
199                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
200                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
201                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
202                               "channel-pkt-inspect-optimize", "on"])
203            except CalledProcessError as e:
204                self.log_print("ERROR: failed to configure NIC port using ethtool!")
205                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
206                self.log_print("Please update your NIC driver and firmware versions and try again.")
207            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
208            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
209
210    def configure_services(self):
211        self.log_print("Configuring active services...")
212        svc_config = configparser.ConfigParser(strict=False)
213
214        # Below list is valid only for RHEL / Fedora systems and might not
215        # contain valid names for other distributions.
216        svc_target_state = {
217            "firewalld": "inactive",
218            "irqbalance": "inactive",
219            "lldpad.service": "inactive",
220            "lldpad.socket": "inactive"
221        }
222
223        for service in svc_target_state:
224            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
225            out = "\n".join(["[%s]" % service, out])
226            svc_config.read_string(out)
227
228            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
229                continue
230
231            service_state = svc_config[service]["ActiveState"]
232            self.log_print("Current state of %s service is %s" % (service, service_state))
233            self.svc_restore_dict.update({service: service_state})
234            if service_state != "inactive":
235                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
236                self.exec_cmd(["sudo", "systemctl", "stop", service])
237
238    def configure_sysctl(self):
239        self.log_print("Tuning sysctl settings...")
240
241        busy_read = 0
242        if self.enable_adq and self.mode == "spdk":
243            busy_read = 1
244
245        sysctl_opts = {
246            "net.core.busy_poll": 0,
247            "net.core.busy_read": busy_read,
248            "net.core.somaxconn": 4096,
249            "net.core.netdev_max_backlog": 8192,
250            "net.ipv4.tcp_max_syn_backlog": 16384,
251            "net.core.rmem_max": 268435456,
252            "net.core.wmem_max": 268435456,
253            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
254            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
255            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
256            "net.ipv4.route.flush": 1,
257            "vm.overcommit_memory": 1,
258        }
259
260        for opt, value in sysctl_opts.items():
261            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
262            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
263
264    def configure_tuned(self):
265        if not self.tuned_profile:
266            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
267            return
268
269        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
270        service = "tuned"
271        tuned_config = configparser.ConfigParser(strict=False)
272
273        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
274        out = "\n".join(["[%s]" % service, out])
275        tuned_config.read_string(out)
276        tuned_state = tuned_config[service]["ActiveState"]
277        self.svc_restore_dict.update({service: tuned_state})
278
279        if tuned_state != "inactive":
280            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
281            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
282
283            self.tuned_restore_dict = {
284                "profile": profile,
285                "mode": profile_mode
286            }
287
288        self.exec_cmd(["sudo", "systemctl", "start", service])
289        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
290        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
291
292    def configure_cpu_governor(self):
293        self.log_print("Setting CPU governor to performance...")
294
295        # This assumes that there is the same CPU scaling governor on each CPU
296        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
297        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
298
299    def configure_irq_affinity(self):
300        self.log_print("Setting NIC irq affinity for NICs...")
301
302        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
303        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
304        for nic in nic_names:
305            irq_cmd = ["sudo", irq_script_path, nic]
306            self.log_print(irq_cmd)
307            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
308
309    def restore_services(self):
310        self.log_print("Restoring services...")
311        for service, state in self.svc_restore_dict.items():
312            cmd = "stop" if state == "inactive" else "start"
313            self.exec_cmd(["sudo", "systemctl", cmd, service])
314
315    def restore_sysctl(self):
316        self.log_print("Restoring sysctl settings...")
317        for opt, value in self.sysctl_restore_dict.items():
318            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
319
320    def restore_tuned(self):
321        self.log_print("Restoring tuned-adm settings...")
322
323        if not self.tuned_restore_dict:
324            return
325
326        if self.tuned_restore_dict["mode"] == "auto":
327            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
328            self.log_print("Reverted tuned-adm to auto_profile.")
329        else:
330            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
331            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
332
333    def restore_governor(self):
334        self.log_print("Restoring CPU governor setting...")
335        if self.governor_restore:
336            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
337            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
338
339
340class Target(Server):
341    def __init__(self, name, general_config, target_config):
342        super().__init__(name, general_config, target_config)
343
344        # Defaults
345        self.enable_sar = False
346        self.sar_delay = 0
347        self.sar_interval = 0
348        self.sar_count = 0
349        self.enable_pcm = False
350        self.pcm_dir = ""
351        self.pcm_delay = 0
352        self.pcm_interval = 0
353        self.pcm_count = 0
354        self.enable_bandwidth = 0
355        self.bandwidth_count = 0
356        self.enable_dpdk_memory = False
357        self.dpdk_wait_time = 0
358        self.enable_zcopy = False
359        self.scheduler_name = "static"
360        self.null_block = 0
361        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
362        self.subsystem_info_list = []
363        self.initiator_info = []
364
365        if "null_block_devices" in target_config:
366            self.null_block = target_config["null_block_devices"]
367        if "sar_settings" in target_config:
368            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
369        if "pcm_settings" in target_config:
370            self.enable_pcm = True
371            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
372        if "enable_bandwidth" in target_config:
373            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
374        if "enable_dpdk_memory" in target_config:
375            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
376        if "scheduler_settings" in target_config:
377            self.scheduler_name = target_config["scheduler_settings"]
378        if "zcopy_settings" in target_config:
379            self.enable_zcopy = target_config["zcopy_settings"]
380        if "results_dir" in target_config:
381            self.results_dir = target_config["results_dir"]
382
383        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
384        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
385        self.set_local_nic_info(self.set_local_nic_info_helper())
386
387        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
388            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
389
390        self.configure_system()
391        if self.enable_adq:
392            self.configure_adq()
393        self.sys_config()
394
395    def set_local_nic_info_helper(self):
396        return json.loads(self.exec_cmd(["lshw", "-json"]))
397
398    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
399        stderr_opt = None
400        if stderr_redirect:
401            stderr_opt = subprocess.STDOUT
402        if change_dir:
403            old_cwd = os.getcwd()
404            os.chdir(change_dir)
405            self.log_print("Changing directory to %s" % change_dir)
406
407        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
408
409        if change_dir:
410            os.chdir(old_cwd)
411            self.log_print("Changing directory to %s" % old_cwd)
412        return out
413
414    def zip_spdk_sources(self, spdk_dir, dest_file):
415        self.log_print("Zipping SPDK source directory")
416        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
417        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
418            for file in files:
419                fh.write(os.path.relpath(os.path.join(root, file)))
420        fh.close()
421        self.log_print("Done zipping")
422
423    @staticmethod
424    def _chunks(input_list, chunks_no):
425        div, rem = divmod(len(input_list), chunks_no)
426        for i in range(chunks_no):
427            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
428            yield input_list[si:si + (div + 1 if i < rem else div)]
429
430    def spread_bdevs(self, req_disks):
431        # Spread available block devices indexes:
432        # - evenly across available initiator systems
433        # - evenly across available NIC interfaces for
434        #   each initiator
435        # Not NUMA aware.
436        ip_bdev_map = []
437        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
438
439        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
440            self.initiator_info[i]["bdev_range"] = init_chunk
441            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
442            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
443                for c in nic_chunk:
444                    ip_bdev_map.append((ip, c))
445        return ip_bdev_map
446
447    @staticmethod
448    def read_json_stats(file):
449        with open(file, "r") as json_data:
450            data = json.load(json_data)
451            job_pos = 0  # job_post = 0 because using aggregated results
452
453            # Check if latency is in nano or microseconds to choose correct dict key
454            def get_lat_unit(key_prefix, dict_section):
455                # key prefix - lat, clat or slat.
456                # dict section - portion of json containing latency bucket in question
457                # Return dict key to access the bucket and unit as string
458                for k, _ in dict_section.items():
459                    if k.startswith(key_prefix):
460                        return k, k.split("_")[1]
461
462            def get_clat_percentiles(clat_dict_leaf):
463                if "percentile" in clat_dict_leaf:
464                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
465                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
466                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
467                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
468
469                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
470                else:
471                    # Latest fio versions do not provide "percentile" results if no
472                    # measurements were done, so just return zeroes
473                    return [0, 0, 0, 0]
474
475            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
476            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
477            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
478            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
479            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
480            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
481            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
482            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
483                data["jobs"][job_pos]["read"][clat_key])
484
485            if "ns" in lat_unit:
486                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
487            if "ns" in clat_unit:
488                read_p99_lat = read_p99_lat / 1000
489                read_p99_9_lat = read_p99_9_lat / 1000
490                read_p99_99_lat = read_p99_99_lat / 1000
491                read_p99_999_lat = read_p99_999_lat / 1000
492
493            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
494            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
495            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
496            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
497            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
498            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
499            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
500            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
501                data["jobs"][job_pos]["write"][clat_key])
502
503            if "ns" in lat_unit:
504                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
505            if "ns" in clat_unit:
506                write_p99_lat = write_p99_lat / 1000
507                write_p99_9_lat = write_p99_9_lat / 1000
508                write_p99_99_lat = write_p99_99_lat / 1000
509                write_p99_999_lat = write_p99_999_lat / 1000
510
511        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
512                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
513                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
514                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
515
516    def parse_results(self, results_dir, csv_file):
517        files = os.listdir(results_dir)
518        fio_files = filter(lambda x: ".fio" in x, files)
519        json_files = [x for x in files if ".json" in x]
520
521        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
522                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
523                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
524                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
525
526        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
527                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
528
529        header_line = ",".join(["Name", *headers])
530        aggr_header_line = ",".join(["Name", *aggr_headers])
531
532        # Create empty results file
533        with open(os.path.join(results_dir, csv_file), "w") as fh:
534            fh.write(aggr_header_line + "\n")
535        rows = set()
536
537        for fio_config in fio_files:
538            self.log_print("Getting FIO stats for %s" % fio_config)
539            job_name, _ = os.path.splitext(fio_config)
540
541            # Look in the filename for rwmixread value. Function arguments do
542            # not have that information.
543            # TODO: Improve this function by directly using workload params instead
544            # of regexing through filenames.
545            if "read" in job_name:
546                rw_mixread = 1
547            elif "write" in job_name:
548                rw_mixread = 0
549            else:
550                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
551
552            # If "_CPU" exists in name - ignore it
553            # Initiators for the same job could have different num_cores parameter
554            job_name = re.sub(r"_\d+CPU", "", job_name)
555            job_result_files = [x for x in json_files if x.startswith(job_name)]
556            self.log_print("Matching result files for current fio config:")
557            for j in job_result_files:
558                self.log_print("\t %s" % j)
559
560            # There may have been more than 1 initiator used in test, need to check that
561            # Result files are created so that string after last "_" separator is server name
562            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
563            inits_avg_results = []
564            for i in inits_names:
565                self.log_print("\tGetting stats for initiator %s" % i)
566                # There may have been more than 1 test run for this job, calculate average results for initiator
567                i_results = [x for x in job_result_files if i in x]
568                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
569
570                separate_stats = []
571                for r in i_results:
572                    try:
573                        stats = self.read_json_stats(os.path.join(results_dir, r))
574                        separate_stats.append(stats)
575                        self.log_print(stats)
576                    except JSONDecodeError:
577                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
578
579                init_results = [sum(x) for x in zip(*separate_stats)]
580                init_results = [x / len(separate_stats) for x in init_results]
581                inits_avg_results.append(init_results)
582
583                self.log_print("\tAverage results for initiator %s" % i)
584                self.log_print(init_results)
585                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
586                    fh.write(header_line + "\n")
587                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
588
589            # Sum results of all initiators running this FIO job.
590            # Latency results are an average of latencies from accros all initiators.
591            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
592            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
593            for key in inits_avg_results:
594                if "lat" in key:
595                    inits_avg_results[key] /= len(inits_names)
596
597            # Aggregate separate read/write values into common labels
598            # Take rw_mixread into consideration for mixed read/write workloads.
599            aggregate_results = OrderedDict()
600            for h in aggr_headers:
601                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
602                if "lat" in h:
603                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
604                else:
605                    _ = read_stat + write_stat
606                aggregate_results[h] = "{0:.3f}".format(_)
607
608            rows.add(",".join([job_name, *aggregate_results.values()]))
609
610        # Save results to file
611        for row in rows:
612            with open(os.path.join(results_dir, csv_file), "a") as fh:
613                fh.write(row + "\n")
614        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
615
616    def measure_sar(self, results_dir, sar_file_prefix):
617        cpu_number = os.cpu_count()
618        sar_idle_sum = 0
619        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
620        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
621
622        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
623        time.sleep(self.sar_delay)
624        self.log_print("Starting SAR measurements")
625
626        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
627        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
628            for line in out.split("\n"):
629                if "Average" in line:
630                    if "CPU" in line:
631                        self.log_print("Summary CPU utilization from SAR:")
632                        self.log_print(line)
633                    elif "all" in line:
634                        self.log_print(line)
635                    else:
636                        sar_idle_sum += float(line.split()[7])
637            fh.write(out)
638        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
639
640        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
641            f.write("%0.2f" % sar_cpu_usage)
642
643    def ethtool_after_fio_ramp(self, fio_ramp_time):
644        time.sleep(fio_ramp_time//2)
645        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
646        for nic in nic_names:
647            self.log_print(nic)
648            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
649                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
650
651    def measure_pcm_memory(self, results_dir, pcm_file_name):
652        time.sleep(self.pcm_delay)
653        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
654        pcm_memory = subprocess.Popen(cmd)
655        time.sleep(self.pcm_count)
656        pcm_memory.terminate()
657
658    def measure_pcm(self, results_dir, pcm_file_name):
659        time.sleep(self.pcm_delay)
660        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
661        subprocess.run(cmd)
662        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
663        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
664        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
665        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
666        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
667
668    def measure_pcm_power(self, results_dir, pcm_power_file_name):
669        time.sleep(self.pcm_delay)
670        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
671        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
672            fh.write(out)
673
674    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
675        self.log_print("INFO: starting network bandwidth measure")
676        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
677                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
678
679    def measure_dpdk_memory(self, results_dir):
680        self.log_print("INFO: waiting to generate DPDK memory usage")
681        time.sleep(self.dpdk_wait_time)
682        self.log_print("INFO: generating DPDK memory usage")
683        rpc.env.env_dpdk_get_mem_stats
684        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
685
686    def sys_config(self):
687        self.log_print("====Kernel release:====")
688        self.log_print(os.uname().release)
689        self.log_print("====Kernel command line:====")
690        with open('/proc/cmdline') as f:
691            cmdline = f.readlines()
692            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
693        self.log_print("====sysctl conf:====")
694        with open('/etc/sysctl.conf') as f:
695            sysctl = f.readlines()
696            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
697        self.log_print("====Cpu power info:====")
698        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
699        self.log_print("====zcopy settings:====")
700        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
701        self.log_print("====Scheduler settings:====")
702        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
703
704
705class Initiator(Server):
706    def __init__(self, name, general_config, initiator_config):
707        super().__init__(name, general_config, initiator_config)
708
709        # Required fields
710        self.ip = initiator_config["ip"]
711        self.target_nic_ips = initiator_config["target_nic_ips"]
712
713        # Defaults
714        self.cpus_allowed = None
715        self.cpus_allowed_policy = "shared"
716        self.spdk_dir = "/tmp/spdk"
717        self.fio_bin = "/usr/src/fio/fio"
718        self.nvmecli_bin = "nvme"
719        self.cpu_frequency = None
720        self.subsystem_info_list = []
721
722        if "spdk_dir" in initiator_config:
723            self.spdk_dir = initiator_config["spdk_dir"]
724        if "fio_bin" in initiator_config:
725            self.fio_bin = initiator_config["fio_bin"]
726        if "nvmecli_bin" in initiator_config:
727            self.nvmecli_bin = initiator_config["nvmecli_bin"]
728        if "cpus_allowed" in initiator_config:
729            self.cpus_allowed = initiator_config["cpus_allowed"]
730        if "cpus_allowed_policy" in initiator_config:
731            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
732        if "cpu_frequency" in initiator_config:
733            self.cpu_frequency = initiator_config["cpu_frequency"]
734        if os.getenv('SPDK_WORKSPACE'):
735            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
736
737        self.ssh_connection = paramiko.SSHClient()
738        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
739        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
740        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
741        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
742        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
743
744        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
745            self.copy_spdk("/tmp/spdk.zip")
746        self.set_local_nic_info(self.set_local_nic_info_helper())
747        self.set_cpu_frequency()
748        self.configure_system()
749        if self.enable_adq:
750            self.configure_adq()
751        self.sys_config()
752
753    def set_local_nic_info_helper(self):
754        return json.loads(self.exec_cmd(["lshw", "-json"]))
755
756    def stop(self):
757        self.ssh_connection.close()
758
759    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
760        if change_dir:
761            cmd = ["cd", change_dir, ";", *cmd]
762
763        # In case one of the command elements contains whitespace and is not
764        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
765        # when sending to remote system.
766        for i, c in enumerate(cmd):
767            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
768                cmd[i] = '"%s"' % c
769        cmd = " ".join(cmd)
770
771        # Redirect stderr to stdout thanks using get_pty option if needed
772        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
773        out = stdout.read().decode(encoding="utf-8")
774
775        # Check the return code
776        rc = stdout.channel.recv_exit_status()
777        if rc:
778            raise CalledProcessError(int(rc), cmd, out)
779
780        return out
781
782    def put_file(self, local, remote_dest):
783        ftp = self.ssh_connection.open_sftp()
784        ftp.put(local, remote_dest)
785        ftp.close()
786
787    def get_file(self, remote, local_dest):
788        ftp = self.ssh_connection.open_sftp()
789        ftp.get(remote, local_dest)
790        ftp.close()
791
792    def copy_spdk(self, local_spdk_zip):
793        self.log_print("Copying SPDK sources to initiator %s" % self.name)
794        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
795        self.log_print("Copied sources zip from target")
796        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
797        self.log_print("Sources unpacked")
798
799    def copy_result_files(self, dest_dir):
800        self.log_print("Copying results")
801
802        if not os.path.exists(dest_dir):
803            os.mkdir(dest_dir)
804
805        # Get list of result files from initiator and copy them back to target
806        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
807
808        for file in file_list:
809            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
810                          os.path.join(dest_dir, file))
811        self.log_print("Done copying results")
812
813    def discover_subsystems(self, address_list, subsys_no):
814        num_nvmes = range(0, subsys_no)
815        nvme_discover_output = ""
816        for ip, subsys_no in itertools.product(address_list, num_nvmes):
817            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
818            nvme_discover_cmd = ["sudo",
819                                 "%s" % self.nvmecli_bin,
820                                 "discover", "-t", "%s" % self.transport,
821                                 "-s", "%s" % (4420 + subsys_no),
822                                 "-a", "%s" % ip]
823
824            try:
825                stdout = self.exec_cmd(nvme_discover_cmd)
826                if stdout:
827                    nvme_discover_output = nvme_discover_output + stdout
828            except CalledProcessError:
829                # Do nothing. In case of discovering remote subsystems of kernel target
830                # we expect "nvme discover" to fail a bunch of times because we basically
831                # scan ports.
832                pass
833
834        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
835                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
836                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
837                                nvme_discover_output)  # from nvme discovery output
838        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
839        subsystems = filter(lambda x: "discovery" not in x[1], subsystems)
840        subsystems = list(set(subsystems))
841        subsystems.sort(key=lambda x: x[1])
842        self.log_print("Found matching subsystems on target side:")
843        for s in subsystems:
844            self.log_print(s)
845        self.subsystem_info_list = subsystems
846
847    def gen_fio_filename_conf(self, *args, **kwargs):
848        # Logic implemented in SPDKInitiator and KernelInitiator classes
849        pass
850
851    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
852        fio_conf_template = """
853[global]
854ioengine={ioengine}
855{spdk_conf}
856thread=1
857group_reporting=1
858direct=1
859percentile_list=50:90:99:99.5:99.9:99.99:99.999
860
861norandommap=1
862rw={rw}
863rwmixread={rwmixread}
864bs={block_size}
865time_based=1
866ramp_time={ramp_time}
867runtime={run_time}
868rate_iops={rate_iops}
869"""
870        if "spdk" in self.mode:
871            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
872            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
873            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
874            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
875        else:
876            ioengine = self.ioengine
877            spdk_conf = ""
878            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
879                                 "|", "awk", "'{print $1}'"])
880            subsystems = [x for x in out.split("\n") if "nvme" in x]
881
882        if self.cpus_allowed is not None:
883            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
884            cpus_num = 0
885            cpus = self.cpus_allowed.split(",")
886            for cpu in cpus:
887                if "-" in cpu:
888                    a, b = cpu.split("-")
889                    a = int(a)
890                    b = int(b)
891                    cpus_num += len(range(a, b))
892                else:
893                    cpus_num += 1
894            self.num_cores = cpus_num
895            threads = range(0, self.num_cores)
896        elif hasattr(self, 'num_cores'):
897            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
898            threads = range(0, int(self.num_cores))
899        else:
900            self.num_cores = len(subsystems)
901            threads = range(0, len(subsystems))
902
903        if "spdk" in self.mode:
904            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
905        else:
906            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
907
908        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
909                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
910                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
911
912        # TODO: hipri disabled for now, as it causes fio errors:
913        # io_u error on file /dev/nvme2n1: Operation not supported
914        # See comment in KernelInitiator class, kernel_init_connect() function
915        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
916            fio_config = fio_config + """
917fixedbufs=1
918registerfiles=1
919#hipri=1
920"""
921        if num_jobs:
922            fio_config = fio_config + "numjobs=%s \n" % num_jobs
923        if self.cpus_allowed is not None:
924            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
925            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
926        fio_config = fio_config + filename_section
927
928        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
929        if hasattr(self, "num_cores"):
930            fio_config_filename += "_%sCPU" % self.num_cores
931        fio_config_filename += ".fio"
932
933        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
934        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
935        self.log_print("Created FIO Config:")
936        self.log_print(fio_config)
937
938        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
939
940    def set_cpu_frequency(self):
941        if self.cpu_frequency is not None:
942            try:
943                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
944                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
945                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
946            except Exception:
947                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
948                sys.exit()
949        else:
950            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
951
952    def run_fio(self, fio_config_file, run_num=None):
953        job_name, _ = os.path.splitext(fio_config_file)
954        self.log_print("Starting FIO run for job: %s" % job_name)
955        self.log_print("Using FIO: %s" % self.fio_bin)
956
957        if run_num:
958            for i in range(1, run_num + 1):
959                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
960                try:
961                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
962                                            "--output=%s" % output_filename, "--eta=never"], True)
963                    self.log_print(output)
964                except subprocess.CalledProcessError as e:
965                    self.log_print("ERROR: Fio process failed!")
966                    self.log_print(e.stdout)
967        else:
968            output_filename = job_name + "_" + self.name + ".json"
969            output = self.exec_cmd(["sudo", self.fio_bin,
970                                    fio_config_file, "--output-format=json",
971                                    "--output" % output_filename], True)
972            self.log_print(output)
973        self.log_print("FIO run finished. Results in: %s" % output_filename)
974
975    def sys_config(self):
976        self.log_print("====Kernel release:====")
977        self.log_print(self.exec_cmd(["uname", "-r"]))
978        self.log_print("====Kernel command line:====")
979        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
980        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
981        self.log_print("====sysctl conf:====")
982        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
983        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
984        self.log_print("====Cpu power info:====")
985        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
986
987
988class KernelTarget(Target):
989    def __init__(self, name, general_config, target_config):
990        super().__init__(name, general_config, target_config)
991        # Defaults
992        self.nvmet_bin = "nvmetcli"
993
994        if "nvmet_bin" in target_config:
995            self.nvmet_bin = target_config["nvmet_bin"]
996
997    def stop(self):
998        nvmet_command(self.nvmet_bin, "clear")
999
1000    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1001
1002        nvmet_cfg = {
1003            "ports": [],
1004            "hosts": [],
1005            "subsystems": [],
1006        }
1007
1008        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1009            port = str(4420 + bdev_num)
1010            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1011            serial = "SPDK00%s" % bdev_num
1012            bdev_name = nvme_list[bdev_num]
1013
1014            nvmet_cfg["subsystems"].append({
1015                "allowed_hosts": [],
1016                "attr": {
1017                    "allow_any_host": "1",
1018                    "serial": serial,
1019                    "version": "1.3"
1020                },
1021                "namespaces": [
1022                    {
1023                        "device": {
1024                            "path": bdev_name,
1025                            "uuid": "%s" % uuid.uuid4()
1026                        },
1027                        "enable": 1,
1028                        "nsid": port
1029                    }
1030                ],
1031                "nqn": nqn
1032            })
1033
1034            nvmet_cfg["ports"].append({
1035                "addr": {
1036                    "adrfam": "ipv4",
1037                    "traddr": ip,
1038                    "trsvcid": port,
1039                    "trtype": self.transport
1040                },
1041                "portid": bdev_num,
1042                "referrals": [],
1043                "subsystems": [nqn]
1044            })
1045
1046            self.subsystem_info_list.append([port, nqn, ip])
1047        self.subsys_no = len(self.subsystem_info_list)
1048
1049        with open("kernel.conf", "w") as fh:
1050            fh.write(json.dumps(nvmet_cfg, indent=2))
1051
1052    def tgt_start(self):
1053        self.log_print("Configuring kernel NVMeOF Target")
1054
1055        if self.null_block:
1056            print("Configuring with null block device.")
1057            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1058        else:
1059            print("Configuring with NVMe drives.")
1060            nvme_list = get_nvme_devices()
1061
1062        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1063        self.subsys_no = len(nvme_list)
1064
1065        nvmet_command(self.nvmet_bin, "clear")
1066        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1067
1068        if self.enable_adq:
1069            self.adq_configure_tc()
1070
1071        self.log_print("Done configuring kernel NVMeOF Target")
1072
1073
1074class SPDKTarget(Target):
1075    def __init__(self, name, general_config, target_config):
1076        super().__init__(name, general_config, target_config)
1077
1078        # Required fields
1079        self.core_mask = target_config["core_mask"]
1080        self.num_cores = self.get_num_cores(self.core_mask)
1081
1082        # Defaults
1083        self.dif_insert_strip = False
1084        self.null_block_dif_type = 0
1085        self.num_shared_buffers = 4096
1086        self.max_queue_depth = 128
1087        self.bpf_proc = None
1088        self.bpf_scripts = []
1089        self.enable_dsa = False
1090
1091        if "num_shared_buffers" in target_config:
1092            self.num_shared_buffers = target_config["num_shared_buffers"]
1093        if "max_queue_depth" in target_config:
1094            self.max_queue_depth = target_config["max_queue_depth"]
1095        if "null_block_dif_type" in target_config:
1096            self.null_block_dif_type = target_config["null_block_dif_type"]
1097        if "dif_insert_strip" in target_config:
1098            self.dif_insert_strip = target_config["dif_insert_strip"]
1099        if "bpf_scripts" in target_config:
1100            self.bpf_scripts = target_config["bpf_scripts"]
1101        if "dsa_settings" in target_config:
1102            self.enable_dsa = target_config["dsa_settings"]
1103
1104        self.log_print("====DSA settings:====")
1105        self.log_print("DSA enabled: %s" % (self.enable_dsa))
1106
1107    @staticmethod
1108    def get_num_cores(core_mask):
1109        if "0x" in core_mask:
1110            return bin(int(core_mask, 16)).count("1")
1111        else:
1112            num_cores = 0
1113            core_mask = core_mask.replace("[", "")
1114            core_mask = core_mask.replace("]", "")
1115            for i in core_mask.split(","):
1116                if "-" in i:
1117                    x, y = i.split("-")
1118                    num_cores += len(range(int(x), int(y))) + 1
1119                else:
1120                    num_cores += 1
1121            return num_cores
1122
1123    def spdk_tgt_configure(self):
1124        self.log_print("Configuring SPDK NVMeOF target via RPC")
1125
1126        if self.enable_adq:
1127            self.adq_configure_tc()
1128
1129        # Create transport layer
1130        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1131                                       num_shared_buffers=self.num_shared_buffers,
1132                                       max_queue_depth=self.max_queue_depth,
1133                                       dif_insert_or_strip=self.dif_insert_strip,
1134                                       sock_priority=self.adq_priority)
1135        self.log_print("SPDK NVMeOF transport layer:")
1136        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1137
1138        if self.null_block:
1139            self.spdk_tgt_add_nullblock(self.null_block)
1140            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1141        else:
1142            self.spdk_tgt_add_nvme_conf()
1143            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1144
1145        self.log_print("Done configuring SPDK NVMeOF Target")
1146
1147    def spdk_tgt_add_nullblock(self, null_block_count):
1148        md_size = 0
1149        block_size = 4096
1150        if self.null_block_dif_type != 0:
1151            md_size = 128
1152
1153        self.log_print("Adding null block bdevices to config via RPC")
1154        for i in range(null_block_count):
1155            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1156            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1157                                      dif_type=self.null_block_dif_type, md_size=md_size)
1158        self.log_print("SPDK Bdevs configuration:")
1159        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1160
1161    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1162        self.log_print("Adding NVMe bdevs to config via RPC")
1163
1164        bdfs = get_nvme_devices_bdf()
1165        bdfs = [b.replace(":", ".") for b in bdfs]
1166
1167        if req_num_disks:
1168            if req_num_disks > len(bdfs):
1169                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1170                sys.exit(1)
1171            else:
1172                bdfs = bdfs[0:req_num_disks]
1173
1174        for i, bdf in enumerate(bdfs):
1175            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1176
1177        self.log_print("SPDK Bdevs configuration:")
1178        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1179
1180    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1181        self.log_print("Adding subsystems to config")
1182        if not req_num_disks:
1183            req_num_disks = get_nvme_devices_count()
1184
1185        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1186            port = str(4420 + bdev_num)
1187            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1188            serial = "SPDK00%s" % bdev_num
1189            bdev_name = "Nvme%sn1" % bdev_num
1190
1191            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1192                                           allow_any_host=True, max_namespaces=8)
1193            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1194            for nqn_name in [nqn, "discovery"]:
1195                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1196                                                     nqn=nqn_name,
1197                                                     trtype=self.transport,
1198                                                     traddr=ip,
1199                                                     trsvcid=port,
1200                                                     adrfam="ipv4")
1201            self.subsystem_info_list.append([port, nqn, ip])
1202        self.subsys_no = len(self.subsystem_info_list)
1203
1204        self.log_print("SPDK NVMeOF subsystem configuration:")
1205        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1206
1207    def bpf_start(self):
1208        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1209        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1210        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1211        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1212
1213        with open(self.pid, "r") as fh:
1214            nvmf_pid = str(fh.readline())
1215
1216        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1217        self.log_print(cmd)
1218        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1219
1220    def tgt_start(self):
1221        if self.null_block:
1222            self.subsys_no = 1
1223        else:
1224            self.subsys_no = get_nvme_devices_count()
1225        self.log_print("Starting SPDK NVMeOF Target process")
1226        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1227        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1228        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1229
1230        with open(self.pid, "w") as fh:
1231            fh.write(str(proc.pid))
1232        self.nvmf_proc = proc
1233        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1234        self.log_print("Waiting for spdk to initialize...")
1235        while True:
1236            if os.path.exists("/var/tmp/spdk.sock"):
1237                break
1238            time.sleep(1)
1239        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1240
1241        if self.enable_zcopy:
1242            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1243                                           enable_zerocopy_send_server=True)
1244            self.log_print("Target socket options:")
1245            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1246
1247        if self.enable_adq:
1248            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1249            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1250                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1251
1252        if self.enable_dsa:
1253            rpc.dsa.dsa_scan_accel_engine(self.client, config_kernel_mode=None)
1254            self.log_print("Target DSA accel engine enabled")
1255
1256        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1257        rpc.framework_start_init(self.client)
1258
1259        if self.bpf_scripts:
1260            self.bpf_start()
1261
1262        self.spdk_tgt_configure()
1263
1264    def stop(self):
1265        if self.bpf_proc:
1266            self.log_print("Stopping BPF Trace script")
1267            self.bpf_proc.terminate()
1268            self.bpf_proc.wait()
1269
1270        if hasattr(self, "nvmf_proc"):
1271            try:
1272                self.nvmf_proc.terminate()
1273                self.nvmf_proc.wait()
1274            except Exception as e:
1275                self.log_print(e)
1276                self.nvmf_proc.kill()
1277                self.nvmf_proc.communicate()
1278
1279
1280class KernelInitiator(Initiator):
1281    def __init__(self, name, general_config, initiator_config):
1282        super().__init__(name, general_config, initiator_config)
1283
1284        # Defaults
1285        self.extra_params = ""
1286        self.ioengine = "libaio"
1287
1288        if "extra_params" in initiator_config:
1289            self.extra_params = initiator_config["extra_params"]
1290
1291        if "kernel_engine" in initiator_config:
1292            self.ioengine = initiator_config["kernel_engine"]
1293            if "io_uring" in self.ioengine:
1294                self.extra_params = "--nr-poll-queues=8"
1295
1296    def get_connected_nvme_list(self):
1297        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1298        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1299                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1300        return nvme_list
1301
1302    def kernel_init_connect(self):
1303        self.log_print("Below connection attempts may result in error messages, this is expected!")
1304        for subsystem in self.subsystem_info_list:
1305            self.log_print("Trying to connect %s %s %s" % subsystem)
1306            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1307                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1308            time.sleep(2)
1309
1310        if "io_uring" in self.ioengine:
1311            self.log_print("Setting block layer settings for io_uring.")
1312
1313            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1314            #       apparently it's not possible for connected subsystems.
1315            #       Results in "error: Invalid argument"
1316            block_sysfs_settings = {
1317                "iostats": "0",
1318                "rq_affinity": "0",
1319                "nomerges": "2"
1320            }
1321
1322            for disk in self.get_connected_nvme_list():
1323                sysfs = os.path.join("/sys/block", disk, "queue")
1324                for k, v in block_sysfs_settings.items():
1325                    sysfs_opt_path = os.path.join(sysfs, k)
1326                    try:
1327                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1328                    except subprocess.CalledProcessError as e:
1329                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1330                    finally:
1331                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1332                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1333
1334    def kernel_init_disconnect(self):
1335        for subsystem in self.subsystem_info_list:
1336            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1337            time.sleep(1)
1338
1339    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1340        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1341
1342        filename_section = ""
1343        nvme_per_split = int(len(nvme_list) / len(threads))
1344        remainder = len(nvme_list) % len(threads)
1345        iterator = iter(nvme_list)
1346        result = []
1347        for i in range(len(threads)):
1348            result.append([])
1349            for _ in range(nvme_per_split):
1350                result[i].append(next(iterator))
1351                if remainder:
1352                    result[i].append(next(iterator))
1353                    remainder -= 1
1354        for i, r in enumerate(result):
1355            header = "[filename%s]" % i
1356            disks = "\n".join(["filename=%s" % x for x in r])
1357            job_section_qd = round((io_depth * len(r)) / num_jobs)
1358            if job_section_qd == 0:
1359                job_section_qd = 1
1360            iodepth = "iodepth=%s" % job_section_qd
1361            filename_section = "\n".join([filename_section, header, disks, iodepth])
1362
1363        return filename_section
1364
1365
1366class SPDKInitiator(Initiator):
1367    def __init__(self, name, general_config, initiator_config):
1368        super().__init__(name, general_config, initiator_config)
1369
1370        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1371            self.install_spdk()
1372
1373        # Required fields
1374        self.num_cores = initiator_config["num_cores"]
1375
1376        # Optional fields
1377        self.enable_data_digest = False
1378        if "enable_data_digest" in initiator_config:
1379            self.enable_data_digest = initiator_config["enable_data_digest"]
1380
1381    def install_spdk(self):
1382        self.log_print("Using fio binary %s" % self.fio_bin)
1383        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1384        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1385        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1386        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1387        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1388
1389        self.log_print("SPDK built")
1390        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1391
1392    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1393        bdev_cfg_section = {
1394            "subsystems": [
1395                {
1396                    "subsystem": "bdev",
1397                    "config": []
1398                }
1399            ]
1400        }
1401
1402        for i, subsys in enumerate(remote_subsystem_list):
1403            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1404            nvme_ctrl = {
1405                "method": "bdev_nvme_attach_controller",
1406                "params": {
1407                    "name": "Nvme{}".format(i),
1408                    "trtype": self.transport,
1409                    "traddr": sub_addr,
1410                    "trsvcid": sub_port,
1411                    "subnqn": sub_nqn,
1412                    "adrfam": "IPv4"
1413                }
1414            }
1415
1416            if self.enable_adq:
1417                nvme_ctrl["params"].update({"priority": "1"})
1418
1419            if self.enable_data_digest:
1420                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1421
1422            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1423
1424        return json.dumps(bdev_cfg_section, indent=2)
1425
1426    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1427        filename_section = ""
1428        if len(threads) >= len(subsystems):
1429            threads = range(0, len(subsystems))
1430        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1431        nvme_per_split = int(len(subsystems) / len(threads))
1432        remainder = len(subsystems) % len(threads)
1433        iterator = iter(filenames)
1434        result = []
1435        for i in range(len(threads)):
1436            result.append([])
1437            for _ in range(nvme_per_split):
1438                result[i].append(next(iterator))
1439            if remainder:
1440                result[i].append(next(iterator))
1441                remainder -= 1
1442        for i, r in enumerate(result):
1443            header = "[filename%s]" % i
1444            disks = "\n".join(["filename=%s" % x for x in r])
1445            job_section_qd = round((io_depth * len(r)) / num_jobs)
1446            if job_section_qd == 0:
1447                job_section_qd = 1
1448            iodepth = "iodepth=%s" % job_section_qd
1449            filename_section = "\n".join([filename_section, header, disks, iodepth])
1450
1451        return filename_section
1452
1453
1454if __name__ == "__main__":
1455    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1456    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1457
1458    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1459    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1460                        help='Configuration file.')
1461    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1462                        help='Results directory.')
1463    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1464                        help='CSV results filename.')
1465
1466    args = parser.parse_args()
1467
1468    print("Using config file: %s" % args.config)
1469    with open(args.config, "r") as config:
1470        data = json.load(config)
1471
1472    initiators = []
1473    fio_cases = []
1474
1475    general_config = data["general"]
1476    target_config = data["target"]
1477    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1478
1479    for k, v in data.items():
1480        if "target" in k:
1481            v.update({"results_dir": args.results})
1482            if data[k]["mode"] == "spdk":
1483                target_obj = SPDKTarget(k, data["general"], v)
1484            elif data[k]["mode"] == "kernel":
1485                target_obj = KernelTarget(k, data["general"], v)
1486        elif "initiator" in k:
1487            if data[k]["mode"] == "spdk":
1488                init_obj = SPDKInitiator(k, data["general"], v)
1489            elif data[k]["mode"] == "kernel":
1490                init_obj = KernelInitiator(k, data["general"], v)
1491            initiators.append(init_obj)
1492        elif "fio" in k:
1493            fio_workloads = itertools.product(data[k]["bs"],
1494                                              data[k]["qd"],
1495                                              data[k]["rw"])
1496
1497            fio_run_time = data[k]["run_time"]
1498            fio_ramp_time = data[k]["ramp_time"]
1499            fio_rw_mix_read = data[k]["rwmixread"]
1500            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1501            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1502
1503            fio_rate_iops = 0
1504            if "rate_iops" in data[k]:
1505                fio_rate_iops = data[k]["rate_iops"]
1506        else:
1507            continue
1508
1509    try:
1510        os.mkdir(args.results)
1511    except FileExistsError:
1512        pass
1513
1514    for i in initiators:
1515        target_obj.initiator_info.append(
1516            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1517        )
1518
1519    # TODO: This try block is definietly too large. Need to break this up into separate
1520    # logical blocks to reduce size.
1521    try:
1522        target_obj.tgt_start()
1523
1524        for i in initiators:
1525            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1526            if i.enable_adq:
1527                i.adq_configure_tc()
1528
1529        # Poor mans threading
1530        # Run FIO tests
1531        for block_size, io_depth, rw in fio_workloads:
1532            threads = []
1533            configs = []
1534            for i in initiators:
1535                if i.mode == "kernel":
1536                    i.kernel_init_connect()
1537
1538                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1539                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1540                configs.append(cfg)
1541
1542            for i, cfg in zip(initiators, configs):
1543                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1544                threads.append(t)
1545            if target_obj.enable_sar:
1546                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1547                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1548                threads.append(t)
1549
1550            if target_obj.enable_pcm:
1551                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1552
1553                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1554                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1555                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1556
1557                threads.append(pcm_cpu_t)
1558                threads.append(pcm_mem_t)
1559                threads.append(pcm_pow_t)
1560
1561            if target_obj.enable_bandwidth:
1562                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1563                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1564                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1565                threads.append(t)
1566
1567            if target_obj.enable_dpdk_memory:
1568                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1569                threads.append(t)
1570
1571            if target_obj.enable_adq:
1572                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1573                threads.append(ethtool_thread)
1574
1575            for t in threads:
1576                t.start()
1577            for t in threads:
1578                t.join()
1579
1580            for i in initiators:
1581                if i.mode == "kernel":
1582                    i.kernel_init_disconnect()
1583                i.copy_result_files(args.results)
1584
1585        target_obj.restore_governor()
1586        target_obj.restore_tuned()
1587        target_obj.restore_services()
1588        target_obj.restore_sysctl()
1589        if target_obj.enable_adq:
1590            target_obj.reload_driver("ice")
1591        for i in initiators:
1592            i.restore_governor()
1593            i.restore_tuned()
1594            i.restore_services()
1595            i.restore_sysctl()
1596            if i.enable_adq:
1597                i.reload_driver("ice")
1598        target_obj.parse_results(args.results, args.csv_filename)
1599    finally:
1600        for i in initiators:
1601            try:
1602                i.stop()
1603            except Exception as err:
1604                pass
1605        target_obj.stop()
1606