xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision 927f1fd57bd004df581518466ec4c1b8083e5d23)
1#!/usr/bin/env python3
2
3from json.decoder import JSONDecodeError
4import os
5import re
6import sys
7import argparse
8import json
9import zipfile
10import threading
11import subprocess
12import itertools
13import configparser
14import time
15import uuid
16from collections import OrderedDict
17
18import paramiko
19import pandas as pd
20from common import *
21
22sys.path.append(os.path.dirname(__file__) + '/../../../python')
23
24import spdk.rpc as rpc  # noqa
25import spdk.rpc.client as rpc_client  # noqa
26
27
28class Server:
29    def __init__(self, name, general_config, server_config):
30        self.name = name
31        self.username = general_config["username"]
32        self.password = general_config["password"]
33        self.transport = general_config["transport"].lower()
34        self.nic_ips = server_config["nic_ips"]
35        self.mode = server_config["mode"]
36
37        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
38        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
39            self.irq_scripts_dir = server_config["irq_scripts_dir"]
40
41        self.local_nic_info = []
42        self._nics_json_obj = {}
43        self.svc_restore_dict = {}
44        self.sysctl_restore_dict = {}
45        self.tuned_restore_dict = {}
46        self.governor_restore = ""
47        self.tuned_profile = ""
48
49        self.enable_adq = False
50        self.adq_priority = None
51        if "adq_enable" in server_config and server_config["adq_enable"]:
52            self.enable_adq = server_config["adq_enable"]
53            self.adq_priority = 1
54
55        if "tuned_profile" in server_config:
56            self.tuned_profile = server_config["tuned_profile"]
57
58        if not re.match("^[A-Za-z0-9]*$", name):
59            self.log_print("Please use a name which contains only letters or numbers")
60            sys.exit(1)
61
62    def log_print(self, msg):
63        print("[%s] %s" % (self.name, msg), flush=True)
64
65    @staticmethod
66    def get_uncommented_lines(lines):
67        return [line for line in lines if line and not line.startswith('#')]
68
69    def get_nic_name_by_ip(self, ip):
70        if not self._nics_json_obj:
71            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
72            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
73        for nic in self._nics_json_obj:
74            for addr in nic["addr_info"]:
75                if ip in addr["local"]:
76                    return nic["ifname"]
77
78    def set_local_nic_info_helper(self):
79        pass
80
81    def set_local_nic_info(self, pci_info):
82        def extract_network_elements(json_obj):
83            nic_list = []
84            if isinstance(json_obj, list):
85                for x in json_obj:
86                    nic_list.extend(extract_network_elements(x))
87            elif isinstance(json_obj, dict):
88                if "children" in json_obj:
89                    nic_list.extend(extract_network_elements(json_obj["children"]))
90                if "class" in json_obj.keys() and "network" in json_obj["class"]:
91                    nic_list.append(json_obj)
92            return nic_list
93
94        self.local_nic_info = extract_network_elements(pci_info)
95
96    # pylint: disable=R0201
97    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
98        return ""
99
100    def configure_system(self):
101        self.configure_services()
102        self.configure_sysctl()
103        self.configure_tuned()
104        self.configure_cpu_governor()
105        self.configure_irq_affinity()
106
107    def configure_adq(self):
108        if self.mode == "kernel":
109            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
110            return
111        self.adq_load_modules()
112        self.adq_configure_nic()
113
114    def adq_load_modules(self):
115        self.log_print("Modprobing ADQ-related Linux modules...")
116        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
117        for module in adq_module_deps:
118            try:
119                self.exec_cmd(["sudo", "modprobe", module])
120                self.log_print("%s loaded!" % module)
121            except CalledProcessError as e:
122                self.log_print("ERROR: failed to load module %s" % module)
123                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
124
125    def adq_configure_tc(self):
126        self.log_print("Configuring ADQ Traffic classes and filters...")
127
128        if self.mode == "kernel":
129            self.log_print("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
130            return
131
132        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
133        num_queues_tc1 = self.num_cores
134        port_param = "dst_port" if isinstance(self, Target) else "src_port"
135        port = "4420"
136        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
137
138        for nic_ip in self.nic_ips:
139            nic_name = self.get_nic_name_by_ip(nic_ip)
140            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
141                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
142                                "queues", "%s@0" % num_queues_tc0,
143                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
144                                "hw", "1", "mode", "channel"]
145            self.log_print(" ".join(tc_qdisc_map_cmd))
146            self.exec_cmd(tc_qdisc_map_cmd)
147
148            time.sleep(5)
149            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
150            self.log_print(" ".join(tc_qdisc_ingress_cmd))
151            self.exec_cmd(tc_qdisc_ingress_cmd)
152
153            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
154                             "protocol", "ip", "ingress", "prio", "1", "flower",
155                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
156                             "skip_sw", "hw_tc", "1"]
157            self.log_print(" ".join(tc_filter_cmd))
158            self.exec_cmd(tc_filter_cmd)
159
160            # show tc configuration
161            self.log_print("Show tc configuration for %s NIC..." % nic_name)
162            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
163            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
164            self.log_print("%s" % tc_disk_out)
165            self.log_print("%s" % tc_filter_out)
166
167            # Ethtool coalesce settings must be applied after configuring traffic classes
168            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
169            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
170
171            self.log_print("Running set_xps_rxqs script for %s NIC..." % nic_name)
172            xps_cmd = ["sudo", xps_script_path, nic_name]
173            self.log_print(xps_cmd)
174            self.exec_cmd(xps_cmd)
175
176    def reload_driver(self, driver):
177        try:
178            self.exec_cmd(["sudo", "rmmod", driver])
179            self.exec_cmd(["sudo", "modprobe", driver])
180        except CalledProcessError as e:
181            self.log_print("ERROR: failed to reload %s module!" % driver)
182            self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
183
184    def adq_configure_nic(self):
185        self.log_print("Configuring NIC port settings for ADQ testing...")
186
187        # Reload the driver first, to make sure any previous settings are re-set.
188        self.reload_driver("ice")
189
190        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
191        for nic in nic_names:
192            self.log_print(nic)
193            try:
194                self.exec_cmd(["sudo", "ethtool", "-K", nic,
195                               "hw-tc-offload", "on"])  # Enable hardware TC offload
196                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
197                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
198                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
199                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
200                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
201                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
202                               "channel-pkt-inspect-optimize", "on"])
203            except CalledProcessError as e:
204                self.log_print("ERROR: failed to configure NIC port using ethtool!")
205                self.log_print("%s resulted in error: %s" % (e.cmd, e.output))
206                self.log_print("Please update your NIC driver and firmware versions and try again.")
207            self.log_print(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
208            self.log_print(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
209
210    def configure_services(self):
211        self.log_print("Configuring active services...")
212        svc_config = configparser.ConfigParser(strict=False)
213
214        # Below list is valid only for RHEL / Fedora systems and might not
215        # contain valid names for other distributions.
216        svc_target_state = {
217            "firewalld": "inactive",
218            "irqbalance": "inactive",
219            "lldpad.service": "inactive",
220            "lldpad.socket": "inactive"
221        }
222
223        for service in svc_target_state:
224            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
225            out = "\n".join(["[%s]" % service, out])
226            svc_config.read_string(out)
227
228            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
229                continue
230
231            service_state = svc_config[service]["ActiveState"]
232            self.log_print("Current state of %s service is %s" % (service, service_state))
233            self.svc_restore_dict.update({service: service_state})
234            if service_state != "inactive":
235                self.log_print("Disabling %s. It will be restored after the test has finished." % service)
236                self.exec_cmd(["sudo", "systemctl", "stop", service])
237
238    def configure_sysctl(self):
239        self.log_print("Tuning sysctl settings...")
240
241        busy_read = 0
242        if self.enable_adq and self.mode == "spdk":
243            busy_read = 1
244
245        sysctl_opts = {
246            "net.core.busy_poll": 0,
247            "net.core.busy_read": busy_read,
248            "net.core.somaxconn": 4096,
249            "net.core.netdev_max_backlog": 8192,
250            "net.ipv4.tcp_max_syn_backlog": 16384,
251            "net.core.rmem_max": 268435456,
252            "net.core.wmem_max": 268435456,
253            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
254            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
255            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
256            "net.ipv4.route.flush": 1,
257            "vm.overcommit_memory": 1,
258        }
259
260        for opt, value in sysctl_opts.items():
261            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
262            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
263
264    def configure_tuned(self):
265        if not self.tuned_profile:
266            self.log_print("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
267            return
268
269        self.log_print("Configuring tuned-adm profile to %s." % self.tuned_profile)
270        service = "tuned"
271        tuned_config = configparser.ConfigParser(strict=False)
272
273        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
274        out = "\n".join(["[%s]" % service, out])
275        tuned_config.read_string(out)
276        tuned_state = tuned_config[service]["ActiveState"]
277        self.svc_restore_dict.update({service: tuned_state})
278
279        if tuned_state != "inactive":
280            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
281            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
282
283            self.tuned_restore_dict = {
284                "profile": profile,
285                "mode": profile_mode
286            }
287
288        self.exec_cmd(["sudo", "systemctl", "start", service])
289        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
290        self.log_print("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
291
292    def configure_cpu_governor(self):
293        self.log_print("Setting CPU governor to performance...")
294
295        # This assumes that there is the same CPU scaling governor on each CPU
296        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
297        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
298
299    def configure_irq_affinity(self):
300        self.log_print("Setting NIC irq affinity for NICs...")
301
302        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
303        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
304        for nic in nic_names:
305            irq_cmd = ["sudo", irq_script_path, nic]
306            self.log_print(irq_cmd)
307            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
308
309    def restore_services(self):
310        self.log_print("Restoring services...")
311        for service, state in self.svc_restore_dict.items():
312            cmd = "stop" if state == "inactive" else "start"
313            self.exec_cmd(["sudo", "systemctl", cmd, service])
314
315    def restore_sysctl(self):
316        self.log_print("Restoring sysctl settings...")
317        for opt, value in self.sysctl_restore_dict.items():
318            self.log_print(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
319
320    def restore_tuned(self):
321        self.log_print("Restoring tuned-adm settings...")
322
323        if not self.tuned_restore_dict:
324            return
325
326        if self.tuned_restore_dict["mode"] == "auto":
327            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
328            self.log_print("Reverted tuned-adm to auto_profile.")
329        else:
330            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
331            self.log_print("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
332
333    def restore_governor(self):
334        self.log_print("Restoring CPU governor setting...")
335        if self.governor_restore:
336            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
337            self.log_print("Reverted CPU governor to %s." % self.governor_restore)
338
339
340class Target(Server):
341    def __init__(self, name, general_config, target_config):
342        super().__init__(name, general_config, target_config)
343
344        # Defaults
345        self.enable_sar = False
346        self.sar_delay = 0
347        self.sar_interval = 0
348        self.sar_count = 0
349        self.enable_pcm = False
350        self.pcm_dir = ""
351        self.pcm_delay = 0
352        self.pcm_interval = 0
353        self.pcm_count = 0
354        self.enable_bandwidth = 0
355        self.bandwidth_count = 0
356        self.enable_dpdk_memory = False
357        self.dpdk_wait_time = 0
358        self.enable_zcopy = False
359        self.scheduler_name = "static"
360        self.null_block = 0
361        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
362        self.subsystem_info_list = []
363        self.initiator_info = []
364
365        if "null_block_devices" in target_config:
366            self.null_block = target_config["null_block_devices"]
367        if "sar_settings" in target_config:
368            self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = target_config["sar_settings"]
369        if "pcm_settings" in target_config:
370            self.enable_pcm = True
371            self.pcm_dir, self.pcm_delay, self.pcm_interval, self.pcm_count = target_config["pcm_settings"]
372        if "enable_bandwidth" in target_config:
373            self.enable_bandwidth, self.bandwidth_count = target_config["enable_bandwidth"]
374        if "enable_dpdk_memory" in target_config:
375            self.enable_dpdk_memory, self.dpdk_wait_time = target_config["enable_dpdk_memory"]
376        if "scheduler_settings" in target_config:
377            self.scheduler_name = target_config["scheduler_settings"]
378        if "zcopy_settings" in target_config:
379            self.enable_zcopy = target_config["zcopy_settings"]
380        if "results_dir" in target_config:
381            self.results_dir = target_config["results_dir"]
382
383        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
384        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
385        self.set_local_nic_info(self.set_local_nic_info_helper())
386
387        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
388            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
389
390        self.configure_system()
391        if self.enable_adq:
392            self.configure_adq()
393        self.sys_config()
394
395    def set_local_nic_info_helper(self):
396        return json.loads(self.exec_cmd(["lshw", "-json"]))
397
398    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
399        stderr_opt = None
400        if stderr_redirect:
401            stderr_opt = subprocess.STDOUT
402        if change_dir:
403            old_cwd = os.getcwd()
404            os.chdir(change_dir)
405            self.log_print("Changing directory to %s" % change_dir)
406
407        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
408
409        if change_dir:
410            os.chdir(old_cwd)
411            self.log_print("Changing directory to %s" % old_cwd)
412        return out
413
414    def zip_spdk_sources(self, spdk_dir, dest_file):
415        self.log_print("Zipping SPDK source directory")
416        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
417        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
418            for file in files:
419                fh.write(os.path.relpath(os.path.join(root, file)))
420        fh.close()
421        self.log_print("Done zipping")
422
423    @staticmethod
424    def _chunks(input_list, chunks_no):
425        div, rem = divmod(len(input_list), chunks_no)
426        for i in range(chunks_no):
427            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
428            yield input_list[si:si + (div + 1 if i < rem else div)]
429
430    def spread_bdevs(self, req_disks):
431        # Spread available block devices indexes:
432        # - evenly across available initiator systems
433        # - evenly across available NIC interfaces for
434        #   each initiator
435        # Not NUMA aware.
436        ip_bdev_map = []
437        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
438
439        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
440            self.initiator_info[i]["bdev_range"] = init_chunk
441            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
442            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
443                for c in nic_chunk:
444                    ip_bdev_map.append((ip, c))
445        return ip_bdev_map
446
447    @staticmethod
448    def read_json_stats(file):
449        with open(file, "r") as json_data:
450            data = json.load(json_data)
451            job_pos = 0  # job_post = 0 because using aggregated results
452
453            # Check if latency is in nano or microseconds to choose correct dict key
454            def get_lat_unit(key_prefix, dict_section):
455                # key prefix - lat, clat or slat.
456                # dict section - portion of json containing latency bucket in question
457                # Return dict key to access the bucket and unit as string
458                for k, _ in dict_section.items():
459                    if k.startswith(key_prefix):
460                        return k, k.split("_")[1]
461
462            def get_clat_percentiles(clat_dict_leaf):
463                if "percentile" in clat_dict_leaf:
464                    p99_lat = float(clat_dict_leaf["percentile"]["99.000000"])
465                    p99_9_lat = float(clat_dict_leaf["percentile"]["99.900000"])
466                    p99_99_lat = float(clat_dict_leaf["percentile"]["99.990000"])
467                    p99_999_lat = float(clat_dict_leaf["percentile"]["99.999000"])
468
469                    return [p99_lat, p99_9_lat, p99_99_lat, p99_999_lat]
470                else:
471                    # Latest fio versions do not provide "percentile" results if no
472                    # measurements were done, so just return zeroes
473                    return [0, 0, 0, 0]
474
475            read_iops = float(data["jobs"][job_pos]["read"]["iops"])
476            read_bw = float(data["jobs"][job_pos]["read"]["bw"])
477            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
478            read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
479            read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
480            read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
481            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
482            read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat = get_clat_percentiles(
483                data["jobs"][job_pos]["read"][clat_key])
484
485            if "ns" in lat_unit:
486                read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
487            if "ns" in clat_unit:
488                read_p99_lat = read_p99_lat / 1000
489                read_p99_9_lat = read_p99_9_lat / 1000
490                read_p99_99_lat = read_p99_99_lat / 1000
491                read_p99_999_lat = read_p99_999_lat / 1000
492
493            write_iops = float(data["jobs"][job_pos]["write"]["iops"])
494            write_bw = float(data["jobs"][job_pos]["write"]["bw"])
495            lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
496            write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
497            write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
498            write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
499            clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
500            write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat = get_clat_percentiles(
501                data["jobs"][job_pos]["write"][clat_key])
502
503            if "ns" in lat_unit:
504                write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
505            if "ns" in clat_unit:
506                write_p99_lat = write_p99_lat / 1000
507                write_p99_9_lat = write_p99_9_lat / 1000
508                write_p99_99_lat = write_p99_99_lat / 1000
509                write_p99_999_lat = write_p99_999_lat / 1000
510
511        return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
512                read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
513                write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
514                write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
515
516    def parse_results(self, results_dir, csv_file):
517        files = os.listdir(results_dir)
518        fio_files = filter(lambda x: ".fio" in x, files)
519        json_files = [x for x in files if ".json" in x]
520
521        headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
522                   "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
523                   "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
524                   "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
525
526        aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
527                        "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
528
529        header_line = ",".join(["Name", *headers])
530        aggr_header_line = ",".join(["Name", *aggr_headers])
531
532        # Create empty results file
533        with open(os.path.join(results_dir, csv_file), "w") as fh:
534            fh.write(aggr_header_line + "\n")
535        rows = set()
536
537        for fio_config in fio_files:
538            self.log_print("Getting FIO stats for %s" % fio_config)
539            job_name, _ = os.path.splitext(fio_config)
540
541            # Look in the filename for rwmixread value. Function arguments do
542            # not have that information.
543            # TODO: Improve this function by directly using workload params instead
544            # of regexing through filenames.
545            if "read" in job_name:
546                rw_mixread = 1
547            elif "write" in job_name:
548                rw_mixread = 0
549            else:
550                rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
551
552            # If "_CPU" exists in name - ignore it
553            # Initiators for the same job could have different num_cores parameter
554            job_name = re.sub(r"_\d+CPU", "", job_name)
555            job_result_files = [x for x in json_files if x.startswith(job_name)]
556            self.log_print("Matching result files for current fio config:")
557            for j in job_result_files:
558                self.log_print("\t %s" % j)
559
560            # There may have been more than 1 initiator used in test, need to check that
561            # Result files are created so that string after last "_" separator is server name
562            inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
563            inits_avg_results = []
564            for i in inits_names:
565                self.log_print("\tGetting stats for initiator %s" % i)
566                # There may have been more than 1 test run for this job, calculate average results for initiator
567                i_results = [x for x in job_result_files if i in x]
568                i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
569
570                separate_stats = []
571                for r in i_results:
572                    try:
573                        stats = self.read_json_stats(os.path.join(results_dir, r))
574                        separate_stats.append(stats)
575                        self.log_print(stats)
576                    except JSONDecodeError:
577                        self.log_print("ERROR: Failed to parse %s results! Results might be incomplete!" % r)
578
579                init_results = [sum(x) for x in zip(*separate_stats)]
580                init_results = [x / len(separate_stats) for x in init_results]
581                inits_avg_results.append(init_results)
582
583                self.log_print("\tAverage results for initiator %s" % i)
584                self.log_print(init_results)
585                with open(os.path.join(results_dir, i_results_filename), "w") as fh:
586                    fh.write(header_line + "\n")
587                    fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
588
589            # Sum results of all initiators running this FIO job.
590            # Latency results are an average of latencies from accros all initiators.
591            inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
592            inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
593            for key in inits_avg_results:
594                if "lat" in key:
595                    inits_avg_results[key] /= len(inits_names)
596
597            # Aggregate separate read/write values into common labels
598            # Take rw_mixread into consideration for mixed read/write workloads.
599            aggregate_results = OrderedDict()
600            for h in aggr_headers:
601                read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
602                if "lat" in h:
603                    _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
604                else:
605                    _ = read_stat + write_stat
606                aggregate_results[h] = "{0:.3f}".format(_)
607
608            rows.add(",".join([job_name, *aggregate_results.values()]))
609
610        # Save results to file
611        for row in rows:
612            with open(os.path.join(results_dir, csv_file), "a") as fh:
613                fh.write(row + "\n")
614        self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
615
616    def measure_sar(self, results_dir, sar_file_prefix):
617        cpu_number = os.cpu_count()
618        sar_idle_sum = 0
619        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
620        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
621
622        self.log_print("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % self.sar_delay)
623        time.sleep(self.sar_delay)
624        self.log_print("Starting SAR measurements")
625
626        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % self.sar_interval, "%s" % self.sar_count])
627        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
628            for line in out.split("\n"):
629                if "Average" in line:
630                    if "CPU" in line:
631                        self.log_print("Summary CPU utilization from SAR:")
632                        self.log_print(line)
633                    elif "all" in line:
634                        self.log_print(line)
635                    else:
636                        sar_idle_sum += float(line.split()[7])
637            fh.write(out)
638        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
639
640        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
641            f.write("%0.2f" % sar_cpu_usage)
642
643    def ethtool_after_fio_ramp(self, fio_ramp_time):
644        time.sleep(fio_ramp_time//2)
645        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
646        for nic in nic_names:
647            self.log_print(nic)
648            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
649                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
650
651    def measure_pcm_memory(self, results_dir, pcm_file_name):
652        time.sleep(self.pcm_delay)
653        cmd = ["%s/pcm-memory.x" % self.pcm_dir, "%s" % self.pcm_interval, "-csv=%s/%s" % (results_dir, pcm_file_name)]
654        pcm_memory = subprocess.Popen(cmd)
655        time.sleep(self.pcm_count)
656        pcm_memory.terminate()
657
658    def measure_pcm(self, results_dir, pcm_file_name):
659        time.sleep(self.pcm_delay)
660        cmd = ["%s/pcm.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count, "-csv=%s/%s" % (results_dir, pcm_file_name)]
661        subprocess.run(cmd)
662        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
663        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
664        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
665        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
666        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
667
668    def measure_pcm_power(self, results_dir, pcm_power_file_name):
669        time.sleep(self.pcm_delay)
670        out = self.exec_cmd(["%s/pcm-power.x" % self.pcm_dir, "%s" % self.pcm_interval, "-i=%s" % self.pcm_count])
671        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
672            fh.write(out)
673
674    def measure_network_bandwidth(self, results_dir, bandwidth_file_name):
675        self.log_print("INFO: starting network bandwidth measure")
676        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
677                       "-a", "1", "-t", "1000", "-c", str(self.bandwidth_count)])
678
679    def measure_dpdk_memory(self, results_dir):
680        self.log_print("INFO: waiting to generate DPDK memory usage")
681        time.sleep(self.dpdk_wait_time)
682        self.log_print("INFO: generating DPDK memory usage")
683        rpc.env.env_dpdk_get_mem_stats
684        os.rename("/tmp/spdk_mem_dump.txt", "%s/spdk_mem_dump.txt" % (results_dir))
685
686    def sys_config(self):
687        self.log_print("====Kernel release:====")
688        self.log_print(os.uname().release)
689        self.log_print("====Kernel command line:====")
690        with open('/proc/cmdline') as f:
691            cmdline = f.readlines()
692            self.log_print('\n'.join(self.get_uncommented_lines(cmdline)))
693        self.log_print("====sysctl conf:====")
694        with open('/etc/sysctl.conf') as f:
695            sysctl = f.readlines()
696            self.log_print('\n'.join(self.get_uncommented_lines(sysctl)))
697        self.log_print("====Cpu power info:====")
698        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
699        self.log_print("====zcopy settings:====")
700        self.log_print("zcopy enabled: %s" % (self.enable_zcopy))
701        self.log_print("====Scheduler settings:====")
702        self.log_print("SPDK scheduler: %s" % (self.scheduler_name))
703
704
705class Initiator(Server):
706    def __init__(self, name, general_config, initiator_config):
707        super().__init__(name, general_config, initiator_config)
708
709        # Required fields
710        self.ip = initiator_config["ip"]
711        self.target_nic_ips = initiator_config["target_nic_ips"]
712
713        # Defaults
714        self.cpus_allowed = None
715        self.cpus_allowed_policy = "shared"
716        self.spdk_dir = "/tmp/spdk"
717        self.fio_bin = "/usr/src/fio/fio"
718        self.nvmecli_bin = "nvme"
719        self.cpu_frequency = None
720        self.subsystem_info_list = []
721
722        if "spdk_dir" in initiator_config:
723            self.spdk_dir = initiator_config["spdk_dir"]
724        if "fio_bin" in initiator_config:
725            self.fio_bin = initiator_config["fio_bin"]
726        if "nvmecli_bin" in initiator_config:
727            self.nvmecli_bin = initiator_config["nvmecli_bin"]
728        if "cpus_allowed" in initiator_config:
729            self.cpus_allowed = initiator_config["cpus_allowed"]
730        if "cpus_allowed_policy" in initiator_config:
731            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
732        if "cpu_frequency" in initiator_config:
733            self.cpu_frequency = initiator_config["cpu_frequency"]
734        if os.getenv('SPDK_WORKSPACE'):
735            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
736
737        self.ssh_connection = paramiko.SSHClient()
738        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
739        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
740        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
741        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
742        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
743
744        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
745            self.copy_spdk("/tmp/spdk.zip")
746        self.set_local_nic_info(self.set_local_nic_info_helper())
747        self.set_cpu_frequency()
748        self.configure_system()
749        if self.enable_adq:
750            self.configure_adq()
751        self.sys_config()
752
753    def set_local_nic_info_helper(self):
754        return json.loads(self.exec_cmd(["lshw", "-json"]))
755
756    def stop(self):
757        self.ssh_connection.close()
758
759    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
760        if change_dir:
761            cmd = ["cd", change_dir, ";", *cmd]
762
763        # In case one of the command elements contains whitespace and is not
764        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
765        # when sending to remote system.
766        for i, c in enumerate(cmd):
767            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
768                cmd[i] = '"%s"' % c
769        cmd = " ".join(cmd)
770
771        # Redirect stderr to stdout thanks using get_pty option if needed
772        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
773        out = stdout.read().decode(encoding="utf-8")
774
775        # Check the return code
776        rc = stdout.channel.recv_exit_status()
777        if rc:
778            raise CalledProcessError(int(rc), cmd, out)
779
780        return out
781
782    def put_file(self, local, remote_dest):
783        ftp = self.ssh_connection.open_sftp()
784        ftp.put(local, remote_dest)
785        ftp.close()
786
787    def get_file(self, remote, local_dest):
788        ftp = self.ssh_connection.open_sftp()
789        ftp.get(remote, local_dest)
790        ftp.close()
791
792    def copy_spdk(self, local_spdk_zip):
793        self.log_print("Copying SPDK sources to initiator %s" % self.name)
794        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
795        self.log_print("Copied sources zip from target")
796        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
797        self.log_print("Sources unpacked")
798
799    def copy_result_files(self, dest_dir):
800        self.log_print("Copying results")
801
802        if not os.path.exists(dest_dir):
803            os.mkdir(dest_dir)
804
805        # Get list of result files from initiator and copy them back to target
806        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
807
808        for file in file_list:
809            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
810                          os.path.join(dest_dir, file))
811        self.log_print("Done copying results")
812
813    def discover_subsystems(self, address_list, subsys_no):
814        num_nvmes = range(0, subsys_no)
815        nvme_discover_output = ""
816        for ip, subsys_no in itertools.product(address_list, num_nvmes):
817            self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
818            nvme_discover_cmd = ["sudo",
819                                 "%s" % self.nvmecli_bin,
820                                 "discover", "-t", "%s" % self.transport,
821                                 "-s", "%s" % (4420 + subsys_no),
822                                 "-a", "%s" % ip]
823
824            try:
825                stdout = self.exec_cmd(nvme_discover_cmd)
826                if stdout:
827                    nvme_discover_output = nvme_discover_output + stdout
828            except CalledProcessError:
829                # Do nothing. In case of discovering remote subsystems of kernel target
830                # we expect "nvme discover" to fail a bunch of times because we basically
831                # scan ports.
832                pass
833
834        subsystems = re.findall(r'trsvcid:\s(\d+)\s+'  # get svcid number
835                                r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+'  # get NQN id
836                                r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',  # get IP address
837                                nvme_discover_output)  # from nvme discovery output
838        subsystems = filter(lambda x: x[-1] in address_list, subsystems)
839        subsystems = list(set(subsystems))
840        subsystems.sort(key=lambda x: x[1])
841        self.log_print("Found matching subsystems on target side:")
842        for s in subsystems:
843            self.log_print(s)
844        self.subsystem_info_list = subsystems
845
846    def gen_fio_filename_conf(self, *args, **kwargs):
847        # Logic implemented in SPDKInitiator and KernelInitiator classes
848        pass
849
850    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10, rate_iops=0):
851        fio_conf_template = """
852[global]
853ioengine={ioengine}
854{spdk_conf}
855thread=1
856group_reporting=1
857direct=1
858percentile_list=50:90:99:99.5:99.9:99.99:99.999
859
860norandommap=1
861rw={rw}
862rwmixread={rwmixread}
863bs={block_size}
864time_based=1
865ramp_time={ramp_time}
866runtime={run_time}
867rate_iops={rate_iops}
868"""
869        if "spdk" in self.mode:
870            bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
871            self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
872            ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
873            spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
874        else:
875            ioengine = self.ioengine
876            spdk_conf = ""
877            out = self.exec_cmd(["sudo", "nvme", "list", "|", "grep", "-E", "'SPDK|Linux'",
878                                 "|", "awk", "'{print $1}'"])
879            subsystems = [x for x in out.split("\n") if "nvme" in x]
880
881        if self.cpus_allowed is not None:
882            self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
883            cpus_num = 0
884            cpus = self.cpus_allowed.split(",")
885            for cpu in cpus:
886                if "-" in cpu:
887                    a, b = cpu.split("-")
888                    a = int(a)
889                    b = int(b)
890                    cpus_num += len(range(a, b))
891                else:
892                    cpus_num += 1
893            self.num_cores = cpus_num
894            threads = range(0, self.num_cores)
895        elif hasattr(self, 'num_cores'):
896            self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
897            threads = range(0, int(self.num_cores))
898        else:
899            self.num_cores = len(subsystems)
900            threads = range(0, len(subsystems))
901
902        if "spdk" in self.mode:
903            filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs)
904        else:
905            filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
906
907        fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
908                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
909                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
910
911        # TODO: hipri disabled for now, as it causes fio errors:
912        # io_u error on file /dev/nvme2n1: Operation not supported
913        # See comment in KernelInitiator class, kernel_init_connect() function
914        if hasattr(self, "ioengine") and "io_uring" in self.ioengine:
915            fio_config = fio_config + """
916fixedbufs=1
917registerfiles=1
918#hipri=1
919"""
920        if num_jobs:
921            fio_config = fio_config + "numjobs=%s \n" % num_jobs
922        if self.cpus_allowed is not None:
923            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
924            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
925        fio_config = fio_config + filename_section
926
927        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
928        if hasattr(self, "num_cores"):
929            fio_config_filename += "_%sCPU" % self.num_cores
930        fio_config_filename += ".fio"
931
932        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
933        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
934        self.log_print("Created FIO Config:")
935        self.log_print(fio_config)
936
937        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
938
939    def set_cpu_frequency(self):
940        if self.cpu_frequency is not None:
941            try:
942                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
943                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
944                self.log_print(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
945            except Exception:
946                self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
947                sys.exit()
948        else:
949            self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
950
951    def run_fio(self, fio_config_file, run_num=None):
952        job_name, _ = os.path.splitext(fio_config_file)
953        self.log_print("Starting FIO run for job: %s" % job_name)
954        self.log_print("Using FIO: %s" % self.fio_bin)
955
956        if run_num:
957            for i in range(1, run_num + 1):
958                output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
959                try:
960                    output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
961                                            "--output=%s" % output_filename, "--eta=never"], True)
962                    self.log_print(output)
963                except subprocess.CalledProcessError as e:
964                    self.log_print("ERROR: Fio process failed!")
965                    self.log_print(e.stdout)
966        else:
967            output_filename = job_name + "_" + self.name + ".json"
968            output = self.exec_cmd(["sudo", self.fio_bin,
969                                    fio_config_file, "--output-format=json",
970                                    "--output" % output_filename], True)
971            self.log_print(output)
972        self.log_print("FIO run finished. Results in: %s" % output_filename)
973
974    def sys_config(self):
975        self.log_print("====Kernel release:====")
976        self.log_print(self.exec_cmd(["uname", "-r"]))
977        self.log_print("====Kernel command line:====")
978        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
979        self.log_print('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
980        self.log_print("====sysctl conf:====")
981        sysctl = self.exec_cmd(["cat", "/etc/sysctl.conf"])
982        self.log_print('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
983        self.log_print("====Cpu power info:====")
984        self.log_print(self.exec_cmd(["cpupower", "frequency-info"]))
985
986
987class KernelTarget(Target):
988    def __init__(self, name, general_config, target_config):
989        super().__init__(name, general_config, target_config)
990        # Defaults
991        self.nvmet_bin = "nvmetcli"
992
993        if "nvmet_bin" in target_config:
994            self.nvmet_bin = target_config["nvmet_bin"]
995
996    def stop(self):
997        nvmet_command(self.nvmet_bin, "clear")
998
999    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
1000
1001        nvmet_cfg = {
1002            "ports": [],
1003            "hosts": [],
1004            "subsystems": [],
1005        }
1006
1007        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
1008            port = str(4420 + bdev_num)
1009            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1010            serial = "SPDK00%s" % bdev_num
1011            bdev_name = nvme_list[bdev_num]
1012
1013            nvmet_cfg["subsystems"].append({
1014                "allowed_hosts": [],
1015                "attr": {
1016                    "allow_any_host": "1",
1017                    "serial": serial,
1018                    "version": "1.3"
1019                },
1020                "namespaces": [
1021                    {
1022                        "device": {
1023                            "path": bdev_name,
1024                            "uuid": "%s" % uuid.uuid4()
1025                        },
1026                        "enable": 1,
1027                        "nsid": port
1028                    }
1029                ],
1030                "nqn": nqn
1031            })
1032
1033            nvmet_cfg["ports"].append({
1034                "addr": {
1035                    "adrfam": "ipv4",
1036                    "traddr": ip,
1037                    "trsvcid": port,
1038                    "trtype": self.transport
1039                },
1040                "portid": bdev_num,
1041                "referrals": [],
1042                "subsystems": [nqn]
1043            })
1044
1045            self.subsystem_info_list.append([port, nqn, ip])
1046        self.subsys_no = len(self.subsystem_info_list)
1047
1048        with open("kernel.conf", "w") as fh:
1049            fh.write(json.dumps(nvmet_cfg, indent=2))
1050
1051    def tgt_start(self):
1052        self.log_print("Configuring kernel NVMeOF Target")
1053
1054        if self.null_block:
1055            print("Configuring with null block device.")
1056            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
1057        else:
1058            print("Configuring with NVMe drives.")
1059            nvme_list = get_nvme_devices()
1060
1061        self.kernel_tgt_gen_subsystem_conf(nvme_list)
1062        self.subsys_no = len(nvme_list)
1063
1064        nvmet_command(self.nvmet_bin, "clear")
1065        nvmet_command(self.nvmet_bin, "restore kernel.conf")
1066
1067        if self.enable_adq:
1068            self.adq_configure_tc()
1069
1070        self.log_print("Done configuring kernel NVMeOF Target")
1071
1072
1073class SPDKTarget(Target):
1074    def __init__(self, name, general_config, target_config):
1075        super().__init__(name, general_config, target_config)
1076
1077        # Required fields
1078        self.core_mask = target_config["core_mask"]
1079        self.num_cores = self.get_num_cores(self.core_mask)
1080
1081        # Defaults
1082        self.dif_insert_strip = False
1083        self.null_block_dif_type = 0
1084        self.num_shared_buffers = 4096
1085        self.max_queue_depth = 128
1086        self.bpf_proc = None
1087        self.bpf_scripts = []
1088        self.enable_idxd = False
1089
1090        if "num_shared_buffers" in target_config:
1091            self.num_shared_buffers = target_config["num_shared_buffers"]
1092        if "max_queue_depth" in target_config:
1093            self.max_queue_depth = target_config["max_queue_depth"]
1094        if "null_block_dif_type" in target_config:
1095            self.null_block_dif_type = target_config["null_block_dif_type"]
1096        if "dif_insert_strip" in target_config:
1097            self.dif_insert_strip = target_config["dif_insert_strip"]
1098        if "bpf_scripts" in target_config:
1099            self.bpf_scripts = target_config["bpf_scripts"]
1100        if "idxd_settings" in target_config:
1101            self.enable_idxd = target_config["idxd_settings"]
1102
1103        self.log_print("====IDXD settings:====")
1104        self.log_print("IDXD enabled: %s" % (self.enable_idxd))
1105
1106    @staticmethod
1107    def get_num_cores(core_mask):
1108        if "0x" in core_mask:
1109            return bin(int(core_mask, 16)).count("1")
1110        else:
1111            num_cores = 0
1112            core_mask = core_mask.replace("[", "")
1113            core_mask = core_mask.replace("]", "")
1114            for i in core_mask.split(","):
1115                if "-" in i:
1116                    x, y = i.split("-")
1117                    num_cores += len(range(int(x), int(y))) + 1
1118                else:
1119                    num_cores += 1
1120            return num_cores
1121
1122    def spdk_tgt_configure(self):
1123        self.log_print("Configuring SPDK NVMeOF target via RPC")
1124
1125        if self.enable_adq:
1126            self.adq_configure_tc()
1127
1128        # Create transport layer
1129        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1130                                       num_shared_buffers=self.num_shared_buffers,
1131                                       max_queue_depth=self.max_queue_depth,
1132                                       dif_insert_or_strip=self.dif_insert_strip,
1133                                       sock_priority=self.adq_priority)
1134        self.log_print("SPDK NVMeOF transport layer:")
1135        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1136
1137        if self.null_block:
1138            self.spdk_tgt_add_nullblock(self.null_block)
1139            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1140        else:
1141            self.spdk_tgt_add_nvme_conf()
1142            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1143
1144        self.log_print("Done configuring SPDK NVMeOF Target")
1145
1146    def spdk_tgt_add_nullblock(self, null_block_count):
1147        md_size = 0
1148        block_size = 4096
1149        if self.null_block_dif_type != 0:
1150            md_size = 128
1151
1152        self.log_print("Adding null block bdevices to config via RPC")
1153        for i in range(null_block_count):
1154            self.log_print("Setting bdev protection to :%s" % self.null_block_dif_type)
1155            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1156                                      dif_type=self.null_block_dif_type, md_size=md_size)
1157        self.log_print("SPDK Bdevs configuration:")
1158        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1159
1160    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1161        self.log_print("Adding NVMe bdevs to config via RPC")
1162
1163        bdfs = get_nvme_devices_bdf()
1164        bdfs = [b.replace(":", ".") for b in bdfs]
1165
1166        if req_num_disks:
1167            if req_num_disks > len(bdfs):
1168                self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1169                sys.exit(1)
1170            else:
1171                bdfs = bdfs[0:req_num_disks]
1172
1173        for i, bdf in enumerate(bdfs):
1174            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1175
1176        self.log_print("SPDK Bdevs configuration:")
1177        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1178
1179    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1180        self.log_print("Adding subsystems to config")
1181        if not req_num_disks:
1182            req_num_disks = get_nvme_devices_count()
1183
1184        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1185            port = str(4420 + bdev_num)
1186            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1187            serial = "SPDK00%s" % bdev_num
1188            bdev_name = "Nvme%sn1" % bdev_num
1189
1190            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1191                                           allow_any_host=True, max_namespaces=8)
1192            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1193            rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1194                                                 nqn=nqn,
1195                                                 trtype=self.transport,
1196                                                 traddr=ip,
1197                                                 trsvcid=port,
1198                                                 adrfam="ipv4")
1199            self.subsystem_info_list.append([port, nqn, ip])
1200        self.subsys_no = len(self.subsystem_info_list)
1201
1202        self.log_print("SPDK NVMeOF subsystem configuration:")
1203        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1204
1205    def bpf_start(self):
1206        self.log_print("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1207        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1208        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1209        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1210
1211        with open(self.pid, "r") as fh:
1212            nvmf_pid = str(fh.readline())
1213
1214        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1215        self.log_print(cmd)
1216        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1217
1218    def tgt_start(self):
1219        if self.null_block:
1220            self.subsys_no = 1
1221        else:
1222            self.subsys_no = get_nvme_devices_count()
1223        self.log_print("Starting SPDK NVMeOF Target process")
1224        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1225        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1226        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1227
1228        with open(self.pid, "w") as fh:
1229            fh.write(str(proc.pid))
1230        self.nvmf_proc = proc
1231        self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
1232        self.log_print("Waiting for spdk to initialize...")
1233        while True:
1234            if os.path.exists("/var/tmp/spdk.sock"):
1235                break
1236            time.sleep(1)
1237        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1238
1239        if self.enable_zcopy:
1240            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1241                                           enable_zerocopy_send_server=True)
1242            self.log_print("Target socket options:")
1243            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1244
1245        if self.enable_adq:
1246            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1247            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1248                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1249
1250        if self.enable_idxd:
1251            rpc.idxd.idxd_scan_accel_engine(self.client, config_kernel_mode=None)
1252            self.log_print("Target IDXD accel engine enabled")
1253
1254        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name)
1255        rpc.framework_start_init(self.client)
1256
1257        if self.bpf_scripts:
1258            self.bpf_start()
1259
1260        self.spdk_tgt_configure()
1261
1262    def stop(self):
1263        if self.bpf_proc:
1264            self.log_print("Stopping BPF Trace script")
1265            self.bpf_proc.terminate()
1266            self.bpf_proc.wait()
1267
1268        if hasattr(self, "nvmf_proc"):
1269            try:
1270                self.nvmf_proc.terminate()
1271                self.nvmf_proc.wait()
1272            except Exception as e:
1273                self.log_print(e)
1274                self.nvmf_proc.kill()
1275                self.nvmf_proc.communicate()
1276
1277
1278class KernelInitiator(Initiator):
1279    def __init__(self, name, general_config, initiator_config):
1280        super().__init__(name, general_config, initiator_config)
1281
1282        # Defaults
1283        self.extra_params = ""
1284        self.ioengine = "libaio"
1285
1286        if "extra_params" in initiator_config:
1287            self.extra_params = initiator_config["extra_params"]
1288
1289        if "kernel_engine" in initiator_config:
1290            self.ioengine = initiator_config["kernel_engine"]
1291            if "io_uring" in self.ioengine:
1292                self.extra_params = "--nr-poll-queues=8"
1293
1294    def get_connected_nvme_list(self):
1295        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1296        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1297                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1298        return nvme_list
1299
1300    def kernel_init_connect(self):
1301        self.log_print("Below connection attempts may result in error messages, this is expected!")
1302        for subsystem in self.subsystem_info_list:
1303            self.log_print("Trying to connect %s %s %s" % subsystem)
1304            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1305                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1306            time.sleep(2)
1307
1308        if "io_uring" in self.ioengine:
1309            self.log_print("Setting block layer settings for io_uring.")
1310
1311            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1312            #       apparently it's not possible for connected subsystems.
1313            #       Results in "error: Invalid argument"
1314            block_sysfs_settings = {
1315                "iostats": "0",
1316                "rq_affinity": "0",
1317                "nomerges": "2"
1318            }
1319
1320            for disk in self.get_connected_nvme_list():
1321                sysfs = os.path.join("/sys/block", disk, "queue")
1322                for k, v in block_sysfs_settings.items():
1323                    sysfs_opt_path = os.path.join(sysfs, k)
1324                    try:
1325                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1326                    except subprocess.CalledProcessError as e:
1327                        self.log_print("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1328                    finally:
1329                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1330                        self.log_print("%s=%s" % (sysfs_opt_path, _))
1331
1332    def kernel_init_disconnect(self):
1333        for subsystem in self.subsystem_info_list:
1334            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1335            time.sleep(1)
1336
1337    def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
1338        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1339
1340        filename_section = ""
1341        nvme_per_split = int(len(nvme_list) / len(threads))
1342        remainder = len(nvme_list) % len(threads)
1343        iterator = iter(nvme_list)
1344        result = []
1345        for i in range(len(threads)):
1346            result.append([])
1347            for _ in range(nvme_per_split):
1348                result[i].append(next(iterator))
1349                if remainder:
1350                    result[i].append(next(iterator))
1351                    remainder -= 1
1352        for i, r in enumerate(result):
1353            header = "[filename%s]" % i
1354            disks = "\n".join(["filename=%s" % x for x in r])
1355            job_section_qd = round((io_depth * len(r)) / num_jobs)
1356            if job_section_qd == 0:
1357                job_section_qd = 1
1358            iodepth = "iodepth=%s" % job_section_qd
1359            filename_section = "\n".join([filename_section, header, disks, iodepth])
1360
1361        return filename_section
1362
1363
1364class SPDKInitiator(Initiator):
1365    def __init__(self, name, general_config, initiator_config):
1366        super().__init__(name, general_config, initiator_config)
1367
1368        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1369            self.install_spdk()
1370
1371        # Required fields
1372        self.num_cores = initiator_config["num_cores"]
1373
1374        # Optional fields
1375        self.enable_data_digest = False
1376        if "enable_data_digest" in initiator_config:
1377            self.enable_data_digest = initiator_config["enable_data_digest"]
1378
1379    def install_spdk(self):
1380        self.log_print("Using fio binary %s" % self.fio_bin)
1381        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1382        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1383        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1384        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1385        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1386
1387        self.log_print("SPDK built")
1388        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1389
1390    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1391        bdev_cfg_section = {
1392            "subsystems": [
1393                {
1394                    "subsystem": "bdev",
1395                    "config": []
1396                }
1397            ]
1398        }
1399
1400        for i, subsys in enumerate(remote_subsystem_list):
1401            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1402            nvme_ctrl = {
1403                "method": "bdev_nvme_attach_controller",
1404                "params": {
1405                    "name": "Nvme{}".format(i),
1406                    "trtype": self.transport,
1407                    "traddr": sub_addr,
1408                    "trsvcid": sub_port,
1409                    "subnqn": sub_nqn,
1410                    "adrfam": "IPv4"
1411                }
1412            }
1413
1414            if self.enable_adq:
1415                nvme_ctrl["params"].update({"priority": "1"})
1416
1417            if self.enable_data_digest:
1418                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1419
1420            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1421
1422        return json.dumps(bdev_cfg_section, indent=2)
1423
1424    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
1425        filename_section = ""
1426        if len(threads) >= len(subsystems):
1427            threads = range(0, len(subsystems))
1428        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1429        nvme_per_split = int(len(subsystems) / len(threads))
1430        remainder = len(subsystems) % len(threads)
1431        iterator = iter(filenames)
1432        result = []
1433        for i in range(len(threads)):
1434            result.append([])
1435            for _ in range(nvme_per_split):
1436                result[i].append(next(iterator))
1437            if remainder:
1438                result[i].append(next(iterator))
1439                remainder -= 1
1440        for i, r in enumerate(result):
1441            header = "[filename%s]" % i
1442            disks = "\n".join(["filename=%s" % x for x in r])
1443            job_section_qd = round((io_depth * len(r)) / num_jobs)
1444            if job_section_qd == 0:
1445                job_section_qd = 1
1446            iodepth = "iodepth=%s" % job_section_qd
1447            filename_section = "\n".join([filename_section, header, disks, iodepth])
1448
1449        return filename_section
1450
1451
1452if __name__ == "__main__":
1453    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1454    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1455
1456    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1457    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1458                        help='Configuration file.')
1459    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1460                        help='Results directory.')
1461    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1462                        help='CSV results filename.')
1463
1464    args = parser.parse_args()
1465
1466    print("Using config file: %s" % args.config)
1467    with open(args.config, "r") as config:
1468        data = json.load(config)
1469
1470    initiators = []
1471    fio_cases = []
1472
1473    general_config = data["general"]
1474    target_config = data["target"]
1475    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1476
1477    for k, v in data.items():
1478        if "target" in k:
1479            v.update({"results_dir": args.results})
1480            if data[k]["mode"] == "spdk":
1481                target_obj = SPDKTarget(k, data["general"], v)
1482            elif data[k]["mode"] == "kernel":
1483                target_obj = KernelTarget(k, data["general"], v)
1484        elif "initiator" in k:
1485            if data[k]["mode"] == "spdk":
1486                init_obj = SPDKInitiator(k, data["general"], v)
1487            elif data[k]["mode"] == "kernel":
1488                init_obj = KernelInitiator(k, data["general"], v)
1489            initiators.append(init_obj)
1490        elif "fio" in k:
1491            fio_workloads = itertools.product(data[k]["bs"],
1492                                              data[k]["qd"],
1493                                              data[k]["rw"])
1494
1495            fio_run_time = data[k]["run_time"]
1496            fio_ramp_time = data[k]["ramp_time"]
1497            fio_rw_mix_read = data[k]["rwmixread"]
1498            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1499            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1500
1501            fio_rate_iops = 0
1502            if "rate_iops" in data[k]:
1503                fio_rate_iops = data[k]["rate_iops"]
1504        else:
1505            continue
1506
1507    try:
1508        os.mkdir(args.results)
1509    except FileExistsError:
1510        pass
1511
1512    for i in initiators:
1513        target_obj.initiator_info.append(
1514            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1515        )
1516
1517    # TODO: This try block is definietly too large. Need to break this up into separate
1518    # logical blocks to reduce size.
1519    try:
1520        target_obj.tgt_start()
1521
1522        for i in initiators:
1523            i.discover_subsystems(i.target_nic_ips, target_obj.subsys_no)
1524            if i.enable_adq:
1525                i.adq_configure_tc()
1526
1527        # Poor mans threading
1528        # Run FIO tests
1529        for block_size, io_depth, rw in fio_workloads:
1530            threads = []
1531            configs = []
1532            for i in initiators:
1533                if i.mode == "kernel":
1534                    i.kernel_init_connect()
1535
1536                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1537                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops)
1538                configs.append(cfg)
1539
1540            for i, cfg in zip(initiators, configs):
1541                t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
1542                threads.append(t)
1543            if target_obj.enable_sar:
1544                sar_file_prefix = "%s_%s_%s_sar" % (block_size, rw, io_depth)
1545                t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix))
1546                threads.append(t)
1547
1548            if target_obj.enable_pcm:
1549                pcm_fnames = ["%s_%s_%s_%s.csv" % (block_size, rw, io_depth, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1550
1551                pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm, args=(args.results, pcm_fnames[0],))
1552                pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory, args=(args.results, pcm_fnames[1],))
1553                pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power, args=(args.results, pcm_fnames[2],))
1554
1555                threads.append(pcm_cpu_t)
1556                threads.append(pcm_mem_t)
1557                threads.append(pcm_pow_t)
1558
1559            if target_obj.enable_bandwidth:
1560                bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
1561                bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
1562                t = threading.Thread(target=target_obj.measure_network_bandwidth, args=(args.results, bandwidth_file_name,))
1563                threads.append(t)
1564
1565            if target_obj.enable_dpdk_memory:
1566                t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results))
1567                threads.append(t)
1568
1569            if target_obj.enable_adq:
1570                ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1571                threads.append(ethtool_thread)
1572
1573            for t in threads:
1574                t.start()
1575            for t in threads:
1576                t.join()
1577
1578            for i in initiators:
1579                if i.mode == "kernel":
1580                    i.kernel_init_disconnect()
1581                i.copy_result_files(args.results)
1582
1583        target_obj.restore_governor()
1584        target_obj.restore_tuned()
1585        target_obj.restore_services()
1586        target_obj.restore_sysctl()
1587        if target_obj.enable_adq:
1588            target_obj.reload_driver("ice")
1589        for i in initiators:
1590            i.restore_governor()
1591            i.restore_tuned()
1592            i.restore_services()
1593            i.restore_sysctl()
1594            if i.enable_adq:
1595                i.reload_driver("ice")
1596        target_obj.parse_results(args.results, args.csv_filename)
1597    finally:
1598        for i in initiators:
1599            try:
1600                i.stop()
1601            except Exception as err:
1602                pass
1603        target_obj.stop()
1604