xref: /spdk/scripts/perf/nvmf/run_nvmf.py (revision fecffda6ecf8853b82edccde429b68252f0a62c5)
1#!/usr/bin/env python3
2#  SPDX-License-Identifier: BSD-3-Clause
3#  Copyright (C) 2018 Intel Corporation
4#  All rights reserved.
5#
6
7import os
8import re
9import sys
10import argparse
11import json
12import logging
13import zipfile
14import threading
15import subprocess
16import itertools
17import configparser
18import time
19import uuid
20
21import paramiko
22import pandas as pd
23from common import *
24from subprocess import CalledProcessError
25
26sys.path.append(os.path.dirname(__file__) + '/../../../python')
27
28import spdk.rpc as rpc  # noqa
29import spdk.rpc.client as rpc_client  # noqa
30
31
32class Server:
33    def __init__(self, name, general_config, server_config):
34        self.name = name
35        self.username = general_config["username"]
36        self.password = general_config["password"]
37        self.transport = general_config["transport"].lower()
38        self.nic_ips = server_config["nic_ips"]
39        self.mode = server_config["mode"]
40
41        self.log = logging.getLogger(self.name)
42
43        self.irq_scripts_dir = "/usr/src/local/mlnx-tools/ofed_scripts"
44        if "irq_scripts_dir" in server_config and server_config["irq_scripts_dir"]:
45            self.irq_scripts_dir = server_config["irq_scripts_dir"]
46
47        self.local_nic_info = []
48        self._nics_json_obj = {}
49        self.svc_restore_dict = {}
50        self.sysctl_restore_dict = {}
51        self.tuned_restore_dict = {}
52        self.governor_restore = ""
53        self.tuned_profile = ""
54
55        self.enable_adq = False
56        self.adq_priority = None
57        if "adq_enable" in server_config and server_config["adq_enable"]:
58            self.enable_adq = server_config["adq_enable"]
59            self.adq_priority = 1
60
61        if "tuned_profile" in server_config:
62            self.tuned_profile = server_config["tuned_profile"]
63
64        if not re.match(r'^[A-Za-z0-9\-]+$', name):
65            self.log.info("Please use a name which contains only letters, numbers or dashes")
66            sys.exit(1)
67
68    @staticmethod
69    def get_uncommented_lines(lines):
70        return [line for line in lines if line and not line.startswith('#')]
71
72    def get_nic_name_by_ip(self, ip):
73        if not self._nics_json_obj:
74            nics_json_obj = self.exec_cmd(["ip", "-j", "address", "show"])
75            self._nics_json_obj = list(filter(lambda x: x["addr_info"], json.loads(nics_json_obj)))
76        for nic in self._nics_json_obj:
77            for addr in nic["addr_info"]:
78                if ip in addr["local"]:
79                    return nic["ifname"]
80
81    def set_local_nic_info_helper(self):
82        pass
83
84    def set_local_nic_info(self, pci_info):
85        def extract_network_elements(json_obj):
86            nic_list = []
87            if isinstance(json_obj, list):
88                for x in json_obj:
89                    nic_list.extend(extract_network_elements(x))
90            elif isinstance(json_obj, dict):
91                if "children" in json_obj:
92                    nic_list.extend(extract_network_elements(json_obj["children"]))
93                if "class" in json_obj.keys() and "network" in json_obj["class"]:
94                    nic_list.append(json_obj)
95            return nic_list
96
97        self.local_nic_info = extract_network_elements(pci_info)
98
99    def get_nic_numa_node(self, nic_name):
100        return int(self.exec_cmd(["cat", "/sys/class/net/%s/device/numa_node" % nic_name]))
101
102    def get_numa_cpu_map(self):
103        numa_cpu_json_obj = json.loads(self.exec_cmd(["lscpu", "-b", "-e=NODE,CPU", "-J"]))
104        numa_cpu_json_map = {}
105
106        for cpu in numa_cpu_json_obj["cpus"]:
107            cpu_num = int(cpu["cpu"])
108            numa_node = int(cpu["node"])
109            numa_cpu_json_map.setdefault(numa_node, [])
110            numa_cpu_json_map[numa_node].append(cpu_num)
111
112        return numa_cpu_json_map
113
114    # pylint: disable=R0201
115    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
116        return ""
117
118    def configure_system(self):
119        self.load_drivers()
120        self.configure_services()
121        self.configure_sysctl()
122        self.configure_tuned()
123        self.configure_cpu_governor()
124        self.configure_irq_affinity()
125
126    def load_drivers(self):
127        self.log.info("Loading drivers")
128        self.exec_cmd(["sudo", "modprobe", "-a",
129                       "nvme-%s" % self.transport,
130                       "nvmet-%s" % self.transport])
131
132    def configure_adq(self):
133        self.adq_load_modules()
134        self.adq_configure_nic()
135
136    def adq_load_modules(self):
137        self.log.info("Modprobing ADQ-related Linux modules...")
138        adq_module_deps = ["sch_mqprio", "act_mirred", "cls_flower"]
139        for module in adq_module_deps:
140            try:
141                self.exec_cmd(["sudo", "modprobe", module])
142                self.log.info("%s loaded!" % module)
143            except CalledProcessError as e:
144                self.log.error("ERROR: failed to load module %s" % module)
145                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
146
147    def adq_configure_tc(self):
148        self.log.info("Configuring ADQ Traffic classes and filters...")
149
150        num_queues_tc0 = 2  # 2 is minimum number of queues for TC0
151        num_queues_tc1 = self.num_cores
152        port_param = "dst_port" if isinstance(self, Target) else "src_port"
153        port = "4420"
154        xps_script_path = os.path.join(self.spdk_dir, "scripts", "perf", "nvmf", "set_xps_rxqs")
155
156        for nic_ip in self.nic_ips:
157            nic_name = self.get_nic_name_by_ip(nic_ip)
158            tc_qdisc_map_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name,
159                                "root", "mqprio", "num_tc", "2", "map", "0", "1",
160                                "queues", "%s@0" % num_queues_tc0,
161                                "%s@%s" % (num_queues_tc1, num_queues_tc0),
162                                "hw", "1", "mode", "channel"]
163            self.log.info(" ".join(tc_qdisc_map_cmd))
164            self.exec_cmd(tc_qdisc_map_cmd)
165
166            time.sleep(5)
167            tc_qdisc_ingress_cmd = ["sudo", "tc", "qdisc", "add", "dev", nic_name, "ingress"]
168            self.log.info(" ".join(tc_qdisc_ingress_cmd))
169            self.exec_cmd(tc_qdisc_ingress_cmd)
170
171            tc_filter_cmd = ["sudo", "tc", "filter", "add", "dev", nic_name,
172                             "protocol", "ip", "ingress", "prio", "1", "flower",
173                             "dst_ip", "%s/32" % nic_ip, "ip_proto", "tcp", port_param, port,
174                             "skip_sw", "hw_tc", "1"]
175            self.log.info(" ".join(tc_filter_cmd))
176            self.exec_cmd(tc_filter_cmd)
177
178            # show tc configuration
179            self.log.info("Show tc configuration for %s NIC..." % nic_name)
180            tc_disk_out = self.exec_cmd(["sudo", "tc", "qdisc", "show", "dev", nic_name])
181            tc_filter_out = self.exec_cmd(["sudo", "tc", "filter", "show", "dev", nic_name, "ingress"])
182            self.log.info("%s" % tc_disk_out)
183            self.log.info("%s" % tc_filter_out)
184
185            # Ethtool coalesce settings must be applied after configuring traffic classes
186            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-rx", "off", "rx-usecs", "0"])
187            self.exec_cmd(["sudo", "ethtool", "--coalesce", nic_name, "adaptive-tx", "off", "tx-usecs", "500"])
188
189            self.log.info("Running set_xps_rxqs script for %s NIC..." % nic_name)
190            xps_cmd = ["sudo", xps_script_path, nic_name]
191            self.log.info(xps_cmd)
192            self.exec_cmd(xps_cmd)
193
194    def reload_driver(self, driver):
195        try:
196            self.exec_cmd(["sudo", "rmmod", driver])
197            self.exec_cmd(["sudo", "modprobe", driver])
198        except CalledProcessError as e:
199            self.log.error("ERROR: failed to reload %s module!" % driver)
200            self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
201
202    def adq_configure_nic(self):
203        self.log.info("Configuring NIC port settings for ADQ testing...")
204
205        # Reload the driver first, to make sure any previous settings are re-set.
206        self.reload_driver("ice")
207
208        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
209        for nic in nic_names:
210            self.log.info(nic)
211            try:
212                self.exec_cmd(["sudo", "ethtool", "-K", nic,
213                               "hw-tc-offload", "on"])  # Enable hardware TC offload
214                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
215                               "channel-inline-flow-director", "on"])  # Enable Intel Flow Director
216                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic, "fw-lldp-agent", "off"])  # Disable LLDP
217                # As temporary workaround for ADQ, channel packet inspection optimization is turned on during connection establishment.
218                # Then turned off before fio ramp_up expires in ethtool_after_fio_ramp().
219                self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
220                               "channel-pkt-inspect-optimize", "on"])
221            except CalledProcessError as e:
222                self.log.error("ERROR: failed to configure NIC port using ethtool!")
223                self.log.error("%s resulted in error: %s" % (e.cmd, e.output))
224                self.log.info("Please update your NIC driver and firmware versions and try again.")
225            self.log.info(self.exec_cmd(["sudo", "ethtool", "-k", nic]))
226            self.log.info(self.exec_cmd(["sudo", "ethtool", "--show-priv-flags", nic]))
227
228    def configure_services(self):
229        self.log.info("Configuring active services...")
230        svc_config = configparser.ConfigParser(strict=False)
231
232        # Below list is valid only for RHEL / Fedora systems and might not
233        # contain valid names for other distributions.
234        svc_target_state = {
235            "firewalld": "inactive",
236            "irqbalance": "inactive",
237            "lldpad.service": "inactive",
238            "lldpad.socket": "inactive"
239        }
240
241        for service in svc_target_state:
242            out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
243            out = "\n".join(["[%s]" % service, out])
244            svc_config.read_string(out)
245
246            if "LoadError" in svc_config[service] and "not found" in svc_config[service]["LoadError"]:
247                continue
248
249            service_state = svc_config[service]["ActiveState"]
250            self.log.info("Current state of %s service is %s" % (service, service_state))
251            self.svc_restore_dict.update({service: service_state})
252            if service_state != "inactive":
253                self.log.info("Disabling %s. It will be restored after the test has finished." % service)
254                self.exec_cmd(["sudo", "systemctl", "stop", service])
255
256    def configure_sysctl(self):
257        self.log.info("Tuning sysctl settings...")
258
259        sysctl_opts = {
260            "net.core.busy_poll": 0,
261            "net.core.busy_read": 0,
262            "net.core.somaxconn": 4096,
263            "net.core.netdev_max_backlog": 8192,
264            "net.ipv4.tcp_max_syn_backlog": 16384,
265            "net.core.rmem_max": 268435456,
266            "net.core.wmem_max": 268435456,
267            "net.ipv4.tcp_mem": "268435456 268435456 268435456",
268            "net.ipv4.tcp_rmem": "8192 1048576 33554432",
269            "net.ipv4.tcp_wmem": "8192 1048576 33554432",
270            "net.ipv4.route.flush": 1,
271            "vm.overcommit_memory": 1,
272        }
273
274        if self.enable_adq:
275            sysctl_opts.update(self.adq_set_busy_read(1))
276
277        for opt, value in sysctl_opts.items():
278            self.sysctl_restore_dict.update({opt: self.exec_cmd(["sysctl", "-n", opt]).strip()})
279            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
280
281    def configure_tuned(self):
282        if not self.tuned_profile:
283            self.log.warning("WARNING: Tuned profile not set in configuration file. Skipping configuration.")
284            return
285
286        self.log.info("Configuring tuned-adm profile to %s." % self.tuned_profile)
287        service = "tuned"
288        tuned_config = configparser.ConfigParser(strict=False)
289
290        out = self.exec_cmd(["sudo", "systemctl", "show", "--no-page", service])
291        out = "\n".join(["[%s]" % service, out])
292        tuned_config.read_string(out)
293        tuned_state = tuned_config[service]["ActiveState"]
294        self.svc_restore_dict.update({service: tuned_state})
295
296        if tuned_state != "inactive":
297            profile = self.exec_cmd(["cat", "/etc/tuned/active_profile"]).strip()
298            profile_mode = self.exec_cmd(["cat", "/etc/tuned/profile_mode"]).strip()
299
300            self.tuned_restore_dict = {
301                "profile": profile,
302                "mode": profile_mode
303            }
304
305        self.exec_cmd(["sudo", "systemctl", "start", service])
306        self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_profile])
307        self.log.info("Tuned profile set to %s." % self.exec_cmd(["cat", "/etc/tuned/active_profile"]))
308
309    def configure_cpu_governor(self):
310        self.log.info("Setting CPU governor to performance...")
311
312        # This assumes that there is the same CPU scaling governor on each CPU
313        self.governor_restore = self.exec_cmd(["cat", "/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor"]).strip()
314        self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "performance"])
315
316    def configure_irq_affinity(self):
317        self.log.info("Setting NIC irq affinity for NICs...")
318
319        irq_script_path = os.path.join(self.irq_scripts_dir, "set_irq_affinity.sh")
320        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
321        for nic in nic_names:
322            irq_cmd = ["sudo", irq_script_path, nic]
323            self.log.info(irq_cmd)
324            self.exec_cmd(irq_cmd, change_dir=self.irq_scripts_dir)
325
326    def restore_services(self):
327        self.log.info("Restoring services...")
328        for service, state in self.svc_restore_dict.items():
329            cmd = "stop" if state == "inactive" else "start"
330            self.exec_cmd(["sudo", "systemctl", cmd, service])
331
332    def restore_sysctl(self):
333        self.log.info("Restoring sysctl settings...")
334        for opt, value in self.sysctl_restore_dict.items():
335            self.log.info(self.exec_cmd(["sudo", "sysctl", "-w", "%s=%s" % (opt, value)]).strip())
336
337    def restore_tuned(self):
338        self.log.info("Restoring tuned-adm settings...")
339
340        if not self.tuned_restore_dict:
341            return
342
343        if self.tuned_restore_dict["mode"] == "auto":
344            self.exec_cmd(["sudo", "tuned-adm", "auto_profile"])
345            self.log.info("Reverted tuned-adm to auto_profile.")
346        else:
347            self.exec_cmd(["sudo", "tuned-adm", "profile", self.tuned_restore_dict["profile"]])
348            self.log.info("Reverted tuned-adm to %s profile." % self.tuned_restore_dict["profile"])
349
350    def restore_governor(self):
351        self.log.info("Restoring CPU governor setting...")
352        if self.governor_restore:
353            self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", self.governor_restore])
354            self.log.info("Reverted CPU governor to %s." % self.governor_restore)
355
356    def restore_settings(self):
357        self.restore_governor()
358        self.restore_tuned()
359        self.restore_services()
360        self.restore_sysctl()
361        if self.enable_adq:
362            self.reload_driver("ice")
363
364
365class Target(Server):
366    def __init__(self, name, general_config, target_config):
367        super().__init__(name, general_config, target_config)
368
369        # Defaults
370        self.enable_zcopy = False
371        self.scheduler_name = "static"
372        self.null_block = 0
373        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
374        self.subsystem_info_list = []
375        self.initiator_info = []
376        self.nvme_allowlist = []
377        self.nvme_blocklist = []
378
379        # Target-side measurement options
380        self.enable_pm = True
381        self.enable_sar = True
382        self.enable_pcm = True
383        self.enable_bw = True
384        self.enable_dpdk_memory = True
385
386        if "null_block_devices" in target_config:
387            self.null_block = target_config["null_block_devices"]
388        if "scheduler_settings" in target_config:
389            self.scheduler_name = target_config["scheduler_settings"]
390        if "zcopy_settings" in target_config:
391            self.enable_zcopy = target_config["zcopy_settings"]
392        if "results_dir" in target_config:
393            self.results_dir = target_config["results_dir"]
394        if "blocklist" in target_config:
395            self.nvme_blocklist = target_config["blocklist"]
396        if "allowlist" in target_config:
397            self.nvme_allowlist = target_config["allowlist"]
398            # Blocklist takes precedence, remove common elements from allowlist
399            self.nvme_allowlist = list(set(self.nvme_allowlist) - set(self.nvme_blocklist))
400        if "enable_pm" in target_config:
401            self.enable_pm = target_config["enable_pm"]
402        if "enable_sar" in target_config:
403            self.enable_sar = target_config["enable_sar"]
404        if "enable_pcm" in target_config:
405            self.enable_pcm = target_config["enable_pcm"]
406        if "enable_bandwidth" in target_config:
407            self.enable_bw = target_config["enable_bandwidth"]
408        if "enable_dpdk_memory" in target_config:
409            self.enable_dpdk_memory = target_config["enable_dpdk_memory"]
410
411        self.log.info("Items now on allowlist: %s" % self.nvme_allowlist)
412        self.log.info("Items now on blocklist: %s" % self.nvme_blocklist)
413
414        self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
415        self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
416        self.set_local_nic_info(self.set_local_nic_info_helper())
417
418        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
419            self.zip_spdk_sources(self.spdk_dir, "/tmp/spdk.zip")
420
421        self.configure_system()
422        if self.enable_adq:
423            self.configure_adq()
424        self.sys_config()
425
426    def set_local_nic_info_helper(self):
427        return json.loads(self.exec_cmd(["lshw", "-json"]))
428
429    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
430        stderr_opt = None
431        if stderr_redirect:
432            stderr_opt = subprocess.STDOUT
433        if change_dir:
434            old_cwd = os.getcwd()
435            os.chdir(change_dir)
436            self.log.info("Changing directory to %s" % change_dir)
437
438        out = check_output(cmd, stderr=stderr_opt).decode(encoding="utf-8")
439
440        if change_dir:
441            os.chdir(old_cwd)
442            self.log.info("Changing directory to %s" % old_cwd)
443        return out
444
445    def zip_spdk_sources(self, spdk_dir, dest_file):
446        self.log.info("Zipping SPDK source directory")
447        fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
448        for root, _directories, files in os.walk(spdk_dir, followlinks=True):
449            for file in files:
450                fh.write(os.path.relpath(os.path.join(root, file)))
451        fh.close()
452        self.log.info("Done zipping")
453
454    @staticmethod
455    def _chunks(input_list, chunks_no):
456        div, rem = divmod(len(input_list), chunks_no)
457        for i in range(chunks_no):
458            si = (div + 1) * (i if i < rem else rem) + div * (0 if i < rem else i - rem)
459            yield input_list[si:si + (div + 1 if i < rem else div)]
460
461    def spread_bdevs(self, req_disks):
462        # Spread available block devices indexes:
463        # - evenly across available initiator systems
464        # - evenly across available NIC interfaces for
465        #   each initiator
466        # Not NUMA aware.
467        ip_bdev_map = []
468        initiator_chunks = self._chunks(range(0, req_disks), len(self.initiator_info))
469
470        for i, (init, init_chunk) in enumerate(zip(self.initiator_info, initiator_chunks)):
471            self.initiator_info[i]["bdev_range"] = init_chunk
472            init_chunks_list = list(self._chunks(init_chunk, len(init["target_nic_ips"])))
473            for ip, nic_chunk in zip(self.initiator_info[i]["target_nic_ips"], init_chunks_list):
474                for c in nic_chunk:
475                    ip_bdev_map.append((ip, c))
476        return ip_bdev_map
477
478    def measure_sar(self, results_dir, sar_file_prefix, ramp_time, run_time):
479        cpu_number = os.cpu_count()
480        sar_idle_sum = 0
481        sar_output_file = os.path.join(results_dir, sar_file_prefix + ".txt")
482        sar_cpu_util_file = os.path.join(results_dir, ".".join([sar_file_prefix + "cpu_util", "txt"]))
483
484        self.log.info("Waiting %d seconds for ramp-up to finish before measuring SAR stats" % ramp_time)
485        time.sleep(ramp_time)
486        self.log.info("Starting SAR measurements")
487
488        out = self.exec_cmd(["sar", "-P", "ALL", "%s" % 1, "%s" % run_time])
489        with open(os.path.join(results_dir, sar_output_file), "w") as fh:
490            for line in out.split("\n"):
491                if "Average" in line:
492                    if "CPU" in line:
493                        self.log.info("Summary CPU utilization from SAR:")
494                        self.log.info(line)
495                    elif "all" in line:
496                        self.log.info(line)
497                    else:
498                        sar_idle_sum += float(line.split()[7])
499            fh.write(out)
500        sar_cpu_usage = cpu_number * 100 - sar_idle_sum
501
502        with open(os.path.join(results_dir, sar_cpu_util_file), "w") as f:
503            f.write("%0.2f" % sar_cpu_usage)
504
505    def measure_power(self, results_dir, prefix, script_full_dir, ramp_time, run_time):
506        time.sleep(ramp_time)
507        self.log.info("Starting power measurements")
508        self.exec_cmd(["%s/../pm/collect-bmc-pm" % script_full_dir,
509                      "-d", "%s" % results_dir, "-l", "-p", "%s" % prefix,
510                       "-x", "-c", "%s" % run_time, "-t", "%s" % 1, "-r"])
511
512    def ethtool_after_fio_ramp(self, fio_ramp_time):
513        time.sleep(fio_ramp_time//2)
514        nic_names = [self.get_nic_name_by_ip(n) for n in self.nic_ips]
515        for nic in nic_names:
516            self.log.info(nic)
517            self.exec_cmd(["sudo", "ethtool", "--set-priv-flags", nic,
518                           "channel-pkt-inspect-optimize", "off"])  # Disable channel packet inspection optimization
519
520    def measure_pcm_memory(self, results_dir, pcm_file_name, ramp_time, run_time):
521        time.sleep(ramp_time)
522        cmd = ["pcm-memory", "1", "-csv=%s/%s" % (results_dir, pcm_file_name)]
523        pcm_memory = subprocess.Popen(cmd)
524        time.sleep(run_time)
525        pcm_memory.terminate()
526
527    def measure_pcm(self, results_dir, pcm_file_name, ramp_time, run_time):
528        time.sleep(ramp_time)
529        cmd = ["pcm", "1", "-i=%s" % run_time,
530               "-csv=%s/%s" % (results_dir, pcm_file_name)]
531        subprocess.run(cmd)
532        df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
533        df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
534        skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
535        skt_pcm_file_name = "_".join(["skt", pcm_file_name])
536        skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
537
538    def measure_pcm_power(self, results_dir, pcm_power_file_name, ramp_time, run_time):
539        time.sleep(ramp_time)
540        out = self.exec_cmd(["pcm-power", "1", "-i=%s" % run_time])
541        with open(os.path.join(results_dir, pcm_power_file_name), "w") as fh:
542            fh.write(out)
543        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
544        #       Improve this so that additional file containing measurements average is generated too.
545
546    def measure_network_bandwidth(self, results_dir, bandwidth_file_name, ramp_time, run_time):
547        self.log.info("Waiting %d seconds for ramp-up to finish before measuring bandwidth stats" % ramp_time)
548        time.sleep(ramp_time)
549        self.log.info("INFO: starting network bandwidth measure")
550        self.exec_cmd(["bwm-ng", "-o", "csv", "-F", "%s/%s" % (results_dir, bandwidth_file_name),
551                       "-a", "1", "-t", "1000", "-c", "%s" % run_time])
552        # TODO: Above command results in a .csv file containing measurements for all gathered samples.
553        #       Improve this so that additional file containing measurements average is generated too.
554        # TODO: Monitor only these interfaces which are currently used to run the workload.
555
556    def measure_dpdk_memory(self, results_dir, dump_file_name, ramp_time):
557        self.log.info("INFO: waiting to generate DPDK memory usage")
558        time.sleep(ramp_time)
559        self.log.info("INFO: generating DPDK memory usage")
560        tmp_dump_file = rpc.env_dpdk.env_dpdk_get_mem_stats(self.client)["filename"]
561        os.rename(tmp_dump_file, "%s/%s" % (results_dir, dump_file_name))
562
563    def sys_config(self):
564        self.log.info("====Kernel release:====")
565        self.log.info(os.uname().release)
566        self.log.info("====Kernel command line:====")
567        with open('/proc/cmdline') as f:
568            cmdline = f.readlines()
569            self.log.info('\n'.join(self.get_uncommented_lines(cmdline)))
570        self.log.info("====sysctl conf:====")
571        with open('/etc/sysctl.conf') as f:
572            sysctl = f.readlines()
573            self.log.info('\n'.join(self.get_uncommented_lines(sysctl)))
574        self.log.info("====Cpu power info:====")
575        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
576        self.log.info("====zcopy settings:====")
577        self.log.info("zcopy enabled: %s" % (self.enable_zcopy))
578        self.log.info("====Scheduler settings:====")
579        self.log.info("SPDK scheduler: %s" % (self.scheduler_name))
580
581
582class Initiator(Server):
583    def __init__(self, name, general_config, initiator_config):
584        super().__init__(name, general_config, initiator_config)
585
586        # Required fields
587        self.ip = initiator_config["ip"]
588        self.target_nic_ips = initiator_config["target_nic_ips"]
589
590        # Defaults
591        self.cpus_allowed = None
592        self.cpus_allowed_policy = "shared"
593        self.spdk_dir = "/tmp/spdk"
594        self.fio_bin = "/usr/src/fio/fio"
595        self.nvmecli_bin = "nvme"
596        self.cpu_frequency = None
597        self.subsystem_info_list = []
598
599        if "spdk_dir" in initiator_config:
600            self.spdk_dir = initiator_config["spdk_dir"]
601        if "fio_bin" in initiator_config:
602            self.fio_bin = initiator_config["fio_bin"]
603        if "nvmecli_bin" in initiator_config:
604            self.nvmecli_bin = initiator_config["nvmecli_bin"]
605        if "cpus_allowed" in initiator_config:
606            self.cpus_allowed = initiator_config["cpus_allowed"]
607        if "cpus_allowed_policy" in initiator_config:
608            self.cpus_allowed_policy = initiator_config["cpus_allowed_policy"]
609        if "cpu_frequency" in initiator_config:
610            self.cpu_frequency = initiator_config["cpu_frequency"]
611        if os.getenv('SPDK_WORKSPACE'):
612            self.spdk_dir = os.getenv('SPDK_WORKSPACE')
613
614        self.ssh_connection = paramiko.SSHClient()
615        self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
616        self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
617        self.exec_cmd(["sudo", "rm", "-rf", "%s/nvmf_perf" % self.spdk_dir])
618        self.exec_cmd(["mkdir", "-p", "%s" % self.spdk_dir])
619        self._nics_json_obj = json.loads(self.exec_cmd(["ip", "-j", "address", "show"]))
620
621        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
622            self.copy_spdk("/tmp/spdk.zip")
623        self.set_local_nic_info(self.set_local_nic_info_helper())
624        self.set_cpu_frequency()
625        self.configure_system()
626        if self.enable_adq:
627            self.configure_adq()
628        self.sys_config()
629
630    def set_local_nic_info_helper(self):
631        return json.loads(self.exec_cmd(["lshw", "-json"]))
632
633    def stop(self):
634        self.restore_settings()
635        self.ssh_connection.close()
636
637    def exec_cmd(self, cmd, stderr_redirect=False, change_dir=None):
638        if change_dir:
639            cmd = ["cd", change_dir, ";", *cmd]
640
641        # In case one of the command elements contains whitespace and is not
642        # already quoted, # (e.g. when calling sysctl) quote it again to prevent expansion
643        # when sending to remote system.
644        for i, c in enumerate(cmd):
645            if (" " in c or "\t" in c) and not (c.startswith("'") and c.endswith("'")):
646                cmd[i] = '"%s"' % c
647        cmd = " ".join(cmd)
648
649        # Redirect stderr to stdout thanks using get_pty option if needed
650        _, stdout, _ = self.ssh_connection.exec_command(cmd, get_pty=stderr_redirect)
651        out = stdout.read().decode(encoding="utf-8")
652
653        # Check the return code
654        rc = stdout.channel.recv_exit_status()
655        if rc:
656            raise CalledProcessError(int(rc), cmd, out)
657
658        return out
659
660    def put_file(self, local, remote_dest):
661        ftp = self.ssh_connection.open_sftp()
662        ftp.put(local, remote_dest)
663        ftp.close()
664
665    def get_file(self, remote, local_dest):
666        ftp = self.ssh_connection.open_sftp()
667        ftp.get(remote, local_dest)
668        ftp.close()
669
670    def copy_spdk(self, local_spdk_zip):
671        self.log.info("Copying SPDK sources to initiator %s" % self.name)
672        self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
673        self.log.info("Copied sources zip from target")
674        self.exec_cmd(["unzip", "-qo", "/tmp/spdk_drop.zip", "-d", self.spdk_dir])
675        self.log.info("Sources unpacked")
676
677    def copy_result_files(self, dest_dir):
678        self.log.info("Copying results")
679
680        if not os.path.exists(dest_dir):
681            os.mkdir(dest_dir)
682
683        # Get list of result files from initiator and copy them back to target
684        file_list = self.exec_cmd(["ls", "%s/nvmf_perf" % self.spdk_dir]).strip().split("\n")
685
686        for file in file_list:
687            self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
688                          os.path.join(dest_dir, file))
689        self.log.info("Done copying results")
690
691    def match_subsystems(self, target_subsytems):
692        subsystems = [subsystem for subsystem in target_subsytems if subsystem[2] in self.target_nic_ips]
693        subsystems.sort(key=lambda x: x[1])
694        self.log.info("Found matching subsystems on target side:")
695        for s in subsystems:
696            self.log.info(s)
697        self.subsystem_info_list = subsystems
698
699    def gen_fio_filename_conf(self, *args, **kwargs):
700        # Logic implemented in SPDKInitiator and KernelInitiator classes
701        pass
702
703    def get_route_nic_numa(self, remote_nvme_ip):
704        local_nvme_nic = json.loads(self.exec_cmd(["ip", "-j", "route", "get", remote_nvme_ip]))
705        local_nvme_nic = local_nvme_nic[0]["dev"]
706        return self.get_nic_numa_node(local_nvme_nic)
707
708    @staticmethod
709    def gen_fio_offset_section(offset_inc, num_jobs):
710        offset_inc = 100 // num_jobs if offset_inc == 0 else offset_inc
711        return "\n".join(["size=%s%%" % offset_inc,
712                          "offset=0%",
713                          "offset_increment=%s%%" % offset_inc])
714
715    def gen_fio_numa_section(self, fio_filenames_list):
716        numa_stats = {}
717        for nvme in fio_filenames_list:
718            nvme_numa = self.get_nvme_subsystem_numa(os.path.basename(nvme))
719            numa_stats[nvme_numa] = numa_stats.setdefault(nvme_numa, 0) + 1
720
721        # Use the most common NUMA node for this chunk to allocate memory and CPUs
722        section_local_numa = sorted(numa_stats.items(), key=lambda item: item[1], reverse=True)[0][0]
723
724        return "\n".join(["numa_cpu_nodes=%s" % section_local_numa,
725                          "numa_mem_policy=prefer:%s" % section_local_numa])
726
727    def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no,
728                       num_jobs=None, ramp_time=0, run_time=10, rate_iops=0,
729                       offset=False, offset_inc=0):
730        fio_conf_template = """
731[global]
732ioengine={ioengine}
733{spdk_conf}
734thread=1
735group_reporting=1
736direct=1
737percentile_list=50:90:99:99.5:99.9:99.99:99.999
738
739norandommap=1
740rw={rw}
741rwmixread={rwmixread}
742bs={block_size}
743time_based=1
744ramp_time={ramp_time}
745runtime={run_time}
746rate_iops={rate_iops}
747"""
748
749        if self.cpus_allowed is not None:
750            self.log.info("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
751            cpus_num = 0
752            cpus = self.cpus_allowed.split(",")
753            for cpu in cpus:
754                if "-" in cpu:
755                    a, b = cpu.split("-")
756                    a = int(a)
757                    b = int(b)
758                    cpus_num += len(range(a, b))
759                else:
760                    cpus_num += 1
761            self.num_cores = cpus_num
762            threads = range(0, self.num_cores)
763        elif hasattr(self, 'num_cores'):
764            self.log.info("Limiting FIO workload execution to %s cores" % self.num_cores)
765            threads = range(0, int(self.num_cores))
766        else:
767            self.num_cores = len(self.subsystem_info_list)
768            threads = range(0, len(self.subsystem_info_list))
769
770        filename_section = self.gen_fio_filename_conf(self.subsystem_info_list, threads, io_depth, num_jobs,
771                                                      offset, offset_inc)
772
773        fio_config = fio_conf_template.format(ioengine=self.ioengine, spdk_conf=self.spdk_conf,
774                                              rw=rw, rwmixread=rwmixread, block_size=block_size,
775                                              ramp_time=ramp_time, run_time=run_time, rate_iops=rate_iops)
776
777        # TODO: hipri disabled for now, as it causes fio errors:
778        # io_u error on file /dev/nvme2n1: Operation not supported
779        # See comment in KernelInitiator class, init_connect() function
780        if "io_uring" in self.ioengine:
781            fio_config = fio_config + """
782fixedbufs=1
783registerfiles=1
784#hipri=1
785"""
786        if num_jobs:
787            fio_config = fio_config + "numjobs=%s \n" % num_jobs
788        if self.cpus_allowed is not None:
789            fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
790            fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
791        fio_config = fio_config + filename_section
792
793        fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
794        if hasattr(self, "num_cores"):
795            fio_config_filename += "_%sCPU" % self.num_cores
796        fio_config_filename += ".fio"
797
798        self.exec_cmd(["mkdir", "-p", "%s/nvmf_perf" % self.spdk_dir])
799        self.exec_cmd(["echo", "'%s'" % fio_config, ">", "%s/nvmf_perf/%s" % (self.spdk_dir, fio_config_filename)])
800        self.log.info("Created FIO Config:")
801        self.log.info(fio_config)
802
803        return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
804
805    def set_cpu_frequency(self):
806        if self.cpu_frequency is not None:
807            try:
808                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-g", "userspace"], True)
809                self.exec_cmd(["sudo", "cpupower", "frequency-set", "-f", "%s" % self.cpu_frequency], True)
810                self.log.info(self.exec_cmd(["sudo", "cpupower", "frequency-info"]))
811            except Exception:
812                self.log.error("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
813                sys.exit()
814        else:
815            self.log.warning("WARNING: you have disabled intel_pstate and using default cpu governance.")
816
817    def run_fio(self, fio_config_file, run_num=1):
818        job_name, _ = os.path.splitext(fio_config_file)
819        self.log.info("Starting FIO run for job: %s" % job_name)
820        self.log.info("Using FIO: %s" % self.fio_bin)
821
822        output_filename = job_name + "_run_" + str(run_num) + "_" + self.name + ".json"
823        try:
824            output = self.exec_cmd(["sudo", self.fio_bin, fio_config_file, "--output-format=json",
825                                    "--output=%s" % output_filename, "--eta=never"], True)
826            self.log.info(output)
827            self.log.info("FIO run finished. Results in: %s" % output_filename)
828        except subprocess.CalledProcessError as e:
829            self.log.error("ERROR: Fio process failed!")
830            self.log.error(e.stdout)
831
832    def sys_config(self):
833        self.log.info("====Kernel release:====")
834        self.log.info(self.exec_cmd(["uname", "-r"]))
835        self.log.info("====Kernel command line:====")
836        cmdline = self.exec_cmd(["cat", "/proc/cmdline"])
837        self.log.info('\n'.join(self.get_uncommented_lines(cmdline.splitlines())))
838        self.log.info("====sysctl conf:====")
839        sysctl = self.exec_cmd(["sudo", "cat", "/etc/sysctl.conf"])
840        self.log.info('\n'.join(self.get_uncommented_lines(sysctl.splitlines())))
841        self.log.info("====Cpu power info:====")
842        self.log.info(self.exec_cmd(["cpupower", "frequency-info"]))
843
844
845class KernelTarget(Target):
846    def __init__(self, name, general_config, target_config):
847        super().__init__(name, general_config, target_config)
848        # Defaults
849        self.nvmet_bin = "nvmetcli"
850
851        if "nvmet_bin" in target_config:
852            self.nvmet_bin = target_config["nvmet_bin"]
853
854    def load_drivers(self):
855        self.log.info("Loading drivers")
856        super().load_drivers()
857        if self.null_block:
858            self.exec_cmd(["sudo", "modprobe", "null_blk", "nr_devices=%s" % self.null_block])
859
860    def configure_adq(self):
861        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
862
863    def adq_configure_tc(self):
864        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
865
866    def adq_set_busy_read(self, busy_read_val):
867        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
868        return {"net.core.busy_read": 0}
869
870    def stop(self):
871        self.nvmet_command(self.nvmet_bin, "clear")
872        self.restore_settings()
873
874    def get_nvme_device_bdf(self, nvme_dev_path):
875        nvme_name = os.path.basename(nvme_dev_path)
876        return self.exec_cmd(["cat", "/sys/block/%s/device/address" % nvme_name]).strip()
877
878    def get_nvme_devices(self):
879        dev_list = self.exec_cmd(["lsblk", "-o", "NAME", "-nlpd"]).split("\n")
880        nvme_list = []
881        for dev in dev_list:
882            if "nvme" not in dev:
883                continue
884            if self.get_nvme_device_bdf(dev) in self.nvme_blocklist:
885                continue
886            if len(self.nvme_allowlist) == 0:
887                nvme_list.append(dev)
888                continue
889            if self.get_nvme_device_bdf(dev) in self.nvme_allowlist:
890                nvme_list.append(dev)
891        return dev_list
892
893    def nvmet_command(self, nvmet_bin, command):
894        return self.exec_cmd([nvmet_bin, *(command.split(" "))])
895
896    def kernel_tgt_gen_subsystem_conf(self, nvme_list):
897
898        nvmet_cfg = {
899            "ports": [],
900            "hosts": [],
901            "subsystems": [],
902        }
903
904        for ip, bdev_num in self.spread_bdevs(len(nvme_list)):
905            port = str(4420 + bdev_num)
906            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
907            serial = "SPDK00%s" % bdev_num
908            bdev_name = nvme_list[bdev_num]
909
910            nvmet_cfg["subsystems"].append({
911                "allowed_hosts": [],
912                "attr": {
913                    "allow_any_host": "1",
914                    "serial": serial,
915                    "version": "1.3"
916                },
917                "namespaces": [
918                    {
919                        "device": {
920                            "path": bdev_name,
921                            "uuid": "%s" % uuid.uuid4()
922                        },
923                        "enable": 1,
924                        "nsid": port
925                    }
926                ],
927                "nqn": nqn
928            })
929
930            nvmet_cfg["ports"].append({
931                "addr": {
932                    "adrfam": "ipv4",
933                    "traddr": ip,
934                    "trsvcid": port,
935                    "trtype": self.transport
936                },
937                "portid": bdev_num,
938                "referrals": [],
939                "subsystems": [nqn]
940            })
941
942            self.subsystem_info_list.append((port, nqn, ip))
943        self.subsys_no = len(self.subsystem_info_list)
944
945        with open("kernel.conf", "w") as fh:
946            fh.write(json.dumps(nvmet_cfg, indent=2))
947
948    def tgt_start(self):
949        self.log.info("Configuring kernel NVMeOF Target")
950
951        if self.null_block:
952            self.log.info("Configuring with null block device.")
953            nvme_list = ["/dev/nullb{}".format(x) for x in range(self.null_block)]
954        else:
955            self.log.info("Configuring with NVMe drives.")
956            nvme_list = self.get_nvme_devices()
957
958        self.kernel_tgt_gen_subsystem_conf(nvme_list)
959        self.subsys_no = len(nvme_list)
960
961        self.nvmet_command(self.nvmet_bin, "clear")
962        self.nvmet_command(self.nvmet_bin, "restore kernel.conf")
963
964        if self.enable_adq:
965            self.adq_configure_tc()
966
967        self.log.info("Done configuring kernel NVMeOF Target")
968
969
970class SPDKTarget(Target):
971    def __init__(self, name, general_config, target_config):
972        super().__init__(name, general_config, target_config)
973
974        # Required fields
975        self.core_mask = target_config["core_mask"]
976        self.num_cores = self.get_num_cores(self.core_mask)
977
978        # Defaults
979        self.dif_insert_strip = False
980        self.null_block_dif_type = 0
981        self.num_shared_buffers = 4096
982        self.max_queue_depth = 128
983        self.bpf_proc = None
984        self.bpf_scripts = []
985        self.enable_dsa = False
986        self.scheduler_core_limit = None
987
988        if "num_shared_buffers" in target_config:
989            self.num_shared_buffers = target_config["num_shared_buffers"]
990        if "max_queue_depth" in target_config:
991            self.max_queue_depth = target_config["max_queue_depth"]
992        if "null_block_dif_type" in target_config:
993            self.null_block_dif_type = target_config["null_block_dif_type"]
994        if "dif_insert_strip" in target_config:
995            self.dif_insert_strip = target_config["dif_insert_strip"]
996        if "bpf_scripts" in target_config:
997            self.bpf_scripts = target_config["bpf_scripts"]
998        if "dsa_settings" in target_config:
999            self.enable_dsa = target_config["dsa_settings"]
1000        if "scheduler_core_limit" in target_config:
1001            self.scheduler_core_limit = target_config["scheduler_core_limit"]
1002
1003        self.log.info("====DSA settings:====")
1004        self.log.info("DSA enabled: %s" % (self.enable_dsa))
1005
1006    def adq_set_busy_read(self, busy_read_val):
1007        return {"net.core.busy_read": busy_read_val}
1008
1009    def get_nvme_devices_count(self):
1010        return len(self.get_nvme_devices())
1011
1012    def get_nvme_devices(self):
1013        bdev_subsys_json_obj = json.loads(self.exec_cmd([os.path.join(self.spdk_dir, "scripts/gen_nvme.sh")]))
1014        bdev_bdfs = []
1015        for bdev in bdev_subsys_json_obj["config"]:
1016            bdev_traddr = bdev["params"]["traddr"]
1017            if bdev_traddr in self.nvme_blocklist:
1018                continue
1019            if len(self.nvme_allowlist) == 0:
1020                bdev_bdfs.append(bdev_traddr)
1021            if bdev_traddr in self.nvme_allowlist:
1022                bdev_bdfs.append(bdev_traddr)
1023        return bdev_bdfs
1024
1025    @staticmethod
1026    def get_num_cores(core_mask):
1027        if "0x" in core_mask:
1028            return bin(int(core_mask, 16)).count("1")
1029        else:
1030            num_cores = 0
1031            core_mask = core_mask.replace("[", "")
1032            core_mask = core_mask.replace("]", "")
1033            for i in core_mask.split(","):
1034                if "-" in i:
1035                    x, y = i.split("-")
1036                    num_cores += len(range(int(x), int(y))) + 1
1037                else:
1038                    num_cores += 1
1039            return num_cores
1040
1041    def spdk_tgt_configure(self):
1042        self.log.info("Configuring SPDK NVMeOF target via RPC")
1043
1044        if self.enable_adq:
1045            self.adq_configure_tc()
1046
1047        # Create transport layer
1048        rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport,
1049                                       num_shared_buffers=self.num_shared_buffers,
1050                                       max_queue_depth=self.max_queue_depth,
1051                                       dif_insert_or_strip=self.dif_insert_strip,
1052                                       sock_priority=self.adq_priority)
1053        self.log.info("SPDK NVMeOF transport layer:")
1054        rpc_client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
1055
1056        if self.null_block:
1057            self.spdk_tgt_add_nullblock(self.null_block)
1058            self.spdk_tgt_add_subsystem_conf(self.nic_ips, self.null_block)
1059        else:
1060            self.spdk_tgt_add_nvme_conf()
1061            self.spdk_tgt_add_subsystem_conf(self.nic_ips)
1062
1063        self.log.info("Done configuring SPDK NVMeOF Target")
1064
1065    def spdk_tgt_add_nullblock(self, null_block_count):
1066        md_size = 0
1067        block_size = 4096
1068        if self.null_block_dif_type != 0:
1069            md_size = 128
1070
1071        self.log.info("Adding null block bdevices to config via RPC")
1072        for i in range(null_block_count):
1073            self.log.info("Setting bdev protection to :%s" % self.null_block_dif_type)
1074            rpc.bdev.bdev_null_create(self.client, 102400, block_size + md_size, "Nvme{}n1".format(i),
1075                                      dif_type=self.null_block_dif_type, md_size=md_size)
1076        self.log.info("SPDK Bdevs configuration:")
1077        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1078
1079    def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
1080        self.log.info("Adding NVMe bdevs to config via RPC")
1081
1082        bdfs = self.get_nvme_devices()
1083        bdfs = [b.replace(":", ".") for b in bdfs]
1084
1085        if req_num_disks:
1086            if req_num_disks > len(bdfs):
1087                self.log.error("ERROR: Requested number of disks is more than available %s" % len(bdfs))
1088                sys.exit(1)
1089            else:
1090                bdfs = bdfs[0:req_num_disks]
1091
1092        for i, bdf in enumerate(bdfs):
1093            rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
1094
1095        self.log.info("SPDK Bdevs configuration:")
1096        rpc_client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
1097
1098    def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
1099        self.log.info("Adding subsystems to config")
1100        if not req_num_disks:
1101            req_num_disks = self.get_nvme_devices_count()
1102
1103        for ip, bdev_num in self.spread_bdevs(req_num_disks):
1104            port = str(4420 + bdev_num)
1105            nqn = "nqn.2018-09.io.spdk:cnode%s" % bdev_num
1106            serial = "SPDK00%s" % bdev_num
1107            bdev_name = "Nvme%sn1" % bdev_num
1108
1109            rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
1110                                           allow_any_host=True, max_namespaces=8)
1111            rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
1112            for nqn_name in [nqn, "discovery"]:
1113                rpc.nvmf.nvmf_subsystem_add_listener(self.client,
1114                                                     nqn=nqn_name,
1115                                                     trtype=self.transport,
1116                                                     traddr=ip,
1117                                                     trsvcid=port,
1118                                                     adrfam="ipv4")
1119            self.subsystem_info_list.append((port, nqn, ip))
1120        self.subsys_no = len(self.subsystem_info_list)
1121
1122        self.log.info("SPDK NVMeOF subsystem configuration:")
1123        rpc_client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
1124
1125    def bpf_start(self):
1126        self.log.info("Starting BPF Trace scripts: %s" % self.bpf_scripts)
1127        bpf_script = os.path.join(self.spdk_dir, "scripts/bpftrace.sh")
1128        bpf_traces = [os.path.join(self.spdk_dir, "scripts/bpf", trace) for trace in self.bpf_scripts]
1129        results_path = os.path.join(self.results_dir, "bpf_traces.txt")
1130
1131        with open(self.pid, "r") as fh:
1132            nvmf_pid = str(fh.readline())
1133
1134        cmd = [bpf_script, nvmf_pid, *bpf_traces]
1135        self.log.info(cmd)
1136        self.bpf_proc = subprocess.Popen(cmd, env={"BPF_OUTFILE": results_path})
1137
1138    def tgt_start(self):
1139        if self.null_block:
1140            self.subsys_no = 1
1141        else:
1142            self.subsys_no = self.get_nvme_devices_count()
1143        self.log.info("Starting SPDK NVMeOF Target process")
1144        nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
1145        proc = subprocess.Popen([nvmf_app_path, "--wait-for-rpc", "-m", self.core_mask])
1146        self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
1147
1148        with open(self.pid, "w") as fh:
1149            fh.write(str(proc.pid))
1150        self.nvmf_proc = proc
1151        self.log.info("SPDK NVMeOF Target PID=%s" % self.pid)
1152        self.log.info("Waiting for spdk to initialize...")
1153        while True:
1154            if os.path.exists("/var/tmp/spdk.sock"):
1155                break
1156            time.sleep(1)
1157        self.client = rpc_client.JSONRPCClient("/var/tmp/spdk.sock")
1158
1159        rpc.sock.sock_set_default_impl(self.client, impl_name="posix")
1160
1161        if self.enable_zcopy:
1162            rpc.sock.sock_impl_set_options(self.client, impl_name="posix",
1163                                           enable_zerocopy_send_server=True)
1164            self.log.info("Target socket options:")
1165            rpc_client.print_dict(rpc.sock.sock_impl_get_options(self.client, impl_name="posix"))
1166
1167        if self.enable_adq:
1168            rpc.sock.sock_impl_set_options(self.client, impl_name="posix", enable_placement_id=1)
1169            rpc.bdev.bdev_nvme_set_options(self.client, timeout_us=0, action_on_timeout=None,
1170                                           nvme_adminq_poll_period_us=100000, retry_count=4)
1171
1172        if self.enable_dsa:
1173            rpc.dsa.dsa_scan_accel_module(self.client, config_kernel_mode=None)
1174            self.log.info("Target DSA accel module enabled")
1175
1176        rpc.app.framework_set_scheduler(self.client, name=self.scheduler_name, core_limit=self.scheduler_core_limit)
1177        rpc.framework_start_init(self.client)
1178
1179        if self.bpf_scripts:
1180            self.bpf_start()
1181
1182        self.spdk_tgt_configure()
1183
1184    def stop(self):
1185        if self.bpf_proc:
1186            self.log.info("Stopping BPF Trace script")
1187            self.bpf_proc.terminate()
1188            self.bpf_proc.wait()
1189
1190        if hasattr(self, "nvmf_proc"):
1191            try:
1192                self.nvmf_proc.terminate()
1193                self.nvmf_proc.wait(timeout=30)
1194            except Exception as e:
1195                self.log.info("Failed to terminate SPDK Target process. Sending SIGKILL.")
1196                self.log.info(e)
1197                self.nvmf_proc.kill()
1198                self.nvmf_proc.communicate()
1199                # Try to clean up RPC socket files if they were not removed
1200                # because of using 'kill'
1201                try:
1202                    os.remove("/var/tmp/spdk.sock")
1203                    os.remove("/var/tmp/spdk.sock.lock")
1204                except FileNotFoundError:
1205                    pass
1206        self.restore_settings()
1207
1208
1209class KernelInitiator(Initiator):
1210    def __init__(self, name, general_config, initiator_config):
1211        super().__init__(name, general_config, initiator_config)
1212
1213        # Defaults
1214        self.extra_params = ""
1215        self.ioengine = "libaio"
1216        self.spdk_conf = ""
1217
1218        if "num_cores" in initiator_config:
1219            self.num_cores = initiator_config["num_cores"]
1220
1221        if "extra_params" in initiator_config:
1222            self.extra_params = initiator_config["extra_params"]
1223
1224        if "kernel_engine" in initiator_config:
1225            self.ioengine = initiator_config["kernel_engine"]
1226            if "io_uring" in self.ioengine:
1227                self.extra_params = "--nr-poll-queues=8"
1228
1229    def configure_adq(self):
1230        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1231
1232    def adq_configure_tc(self):
1233        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. Skipping configuration.")
1234
1235    def adq_set_busy_read(self, busy_read_val):
1236        self.log.warning("WARNING: ADQ setup not yet supported for Kernel mode. busy_read set to 0")
1237        return {"net.core.busy_read": 0}
1238
1239    def get_connected_nvme_list(self):
1240        json_obj = json.loads(self.exec_cmd(["sudo", "nvme", "list", "-o", "json"]))
1241        nvme_list = [os.path.basename(x["DevicePath"]) for x in json_obj["Devices"]
1242                     if "SPDK" in x["ModelNumber"] or "Linux" in x["ModelNumber"]]
1243        return nvme_list
1244
1245    def init_connect(self):
1246        self.log.info("Below connection attempts may result in error messages, this is expected!")
1247        for subsystem in self.subsystem_info_list:
1248            self.log.info("Trying to connect %s %s %s" % subsystem)
1249            self.exec_cmd(["sudo", self.nvmecli_bin, "connect", "-t", self.transport,
1250                           "-s", subsystem[0], "-n", subsystem[1], "-a", subsystem[2], self.extra_params])
1251            time.sleep(2)
1252
1253        if "io_uring" in self.ioengine:
1254            self.log.info("Setting block layer settings for io_uring.")
1255
1256            # TODO: io_poll=1 and io_poll_delay=-1 params not set here, because
1257            #       apparently it's not possible for connected subsystems.
1258            #       Results in "error: Invalid argument"
1259            block_sysfs_settings = {
1260                "iostats": "0",
1261                "rq_affinity": "0",
1262                "nomerges": "2"
1263            }
1264
1265            for disk in self.get_connected_nvme_list():
1266                sysfs = os.path.join("/sys/block", disk, "queue")
1267                for k, v in block_sysfs_settings.items():
1268                    sysfs_opt_path = os.path.join(sysfs, k)
1269                    try:
1270                        self.exec_cmd(["sudo", "bash", "-c", "echo %s > %s" % (v, sysfs_opt_path)], stderr_redirect=True)
1271                    except CalledProcessError as e:
1272                        self.log.warning("Warning: command %s failed due to error %s. %s was not set!" % (e.cmd, e.output, v))
1273                    finally:
1274                        _ = self.exec_cmd(["sudo", "cat", "%s" % (sysfs_opt_path)])
1275                        self.log.info("%s=%s" % (sysfs_opt_path, _))
1276
1277    def init_disconnect(self):
1278        for subsystem in self.subsystem_info_list:
1279            self.exec_cmd(["sudo", self.nvmecli_bin, "disconnect", "-n", subsystem[1]])
1280            time.sleep(1)
1281
1282    def get_nvme_subsystem_numa(self, dev_name):
1283        # Remove two last characters to get controller name instead of subsystem name
1284        nvme_ctrl = os.path.basename(dev_name)[:-2]
1285        remote_nvme_ip = re.search(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})',
1286                                   self.exec_cmd(["cat", "/sys/class/nvme/%s/address" % nvme_ctrl]))
1287        return self.get_route_nic_numa(remote_nvme_ip.group(0))
1288
1289    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1290        if len(threads) >= len(subsystems):
1291            threads = range(0, len(subsystems))
1292
1293        # Generate connected nvme devices names and sort them by used NIC numa node
1294        # to allow better grouping when splitting into fio sections.
1295        nvme_list = [os.path.join("/dev", nvme) for nvme in self.get_connected_nvme_list()]
1296        nvme_numas = [self.get_nvme_subsystem_numa(x) for x in nvme_list]
1297        nvme_list = [x for _, x in sorted(zip(nvme_numas, nvme_list))]
1298
1299        filename_section = ""
1300        nvme_per_split = int(len(nvme_list) / len(threads))
1301        remainder = len(nvme_list) % len(threads)
1302        iterator = iter(nvme_list)
1303        result = []
1304        for i in range(len(threads)):
1305            result.append([])
1306            for _ in range(nvme_per_split):
1307                result[i].append(next(iterator))
1308                if remainder:
1309                    result[i].append(next(iterator))
1310                    remainder -= 1
1311        for i, r in enumerate(result):
1312            header = "[filename%s]" % i
1313            disks = "\n".join(["filename=%s" % x for x in r])
1314            job_section_qd = round((io_depth * len(r)) / num_jobs)
1315            if job_section_qd == 0:
1316                job_section_qd = 1
1317            iodepth = "iodepth=%s" % job_section_qd
1318
1319            offset_section = ""
1320            if offset:
1321                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1322
1323            numa_opts = self.gen_fio_numa_section(r)
1324
1325            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1326
1327        return filename_section
1328
1329
1330class SPDKInitiator(Initiator):
1331    def __init__(self, name, general_config, initiator_config):
1332        super().__init__(name, general_config, initiator_config)
1333
1334        if "skip_spdk_install" not in general_config or general_config["skip_spdk_install"] is False:
1335            self.install_spdk()
1336
1337        # Optional fields
1338        self.enable_data_digest = False
1339        if "enable_data_digest" in initiator_config:
1340            self.enable_data_digest = initiator_config["enable_data_digest"]
1341        if "num_cores" in initiator_config:
1342            self.num_cores = initiator_config["num_cores"]
1343
1344        self.ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
1345        self.spdk_conf = "spdk_json_conf=%s/bdev.conf" % self.spdk_dir
1346
1347    def adq_set_busy_read(self, busy_read_val):
1348        return {"net.core.busy_read": busy_read_val}
1349
1350    def install_spdk(self):
1351        self.log.info("Using fio binary %s" % self.fio_bin)
1352        self.exec_cmd(["git", "-C", self.spdk_dir, "submodule", "update", "--init"])
1353        self.exec_cmd(["git", "-C", self.spdk_dir, "clean", "-ffdx"])
1354        self.exec_cmd(["cd", self.spdk_dir, "&&", "./configure", "--with-rdma", "--with-fio=%s" % os.path.dirname(self.fio_bin)])
1355        self.exec_cmd(["make", "-C", self.spdk_dir, "clean"])
1356        self.exec_cmd(["make", "-C", self.spdk_dir, "-j$(($(nproc)*2))"])
1357
1358        self.log.info("SPDK built")
1359        self.exec_cmd(["sudo", "%s/scripts/setup.sh" % self.spdk_dir])
1360
1361    def init_connect(self):
1362        # Not a real "connect" like when doing "nvme connect" because SPDK's fio
1363        # bdev plugin initiates connection just before starting IO traffic.
1364        # This is just to have a "init_connect" equivalent of the same function
1365        # from KernelInitiator class.
1366        # Just prepare bdev.conf JSON file for later use and consider it
1367        # "making a connection".
1368        bdev_conf = self.gen_spdk_bdev_conf(self.subsystem_info_list)
1369        self.exec_cmd(["echo", "'%s'" % bdev_conf, ">", "%s/bdev.conf" % self.spdk_dir])
1370
1371    def init_disconnect(self):
1372        # SPDK Initiator does not need to explicity disconnect as this gets done
1373        # after fio bdev plugin finishes IO.
1374        pass
1375
1376    def gen_spdk_bdev_conf(self, remote_subsystem_list):
1377        bdev_cfg_section = {
1378            "subsystems": [
1379                {
1380                    "subsystem": "bdev",
1381                    "config": []
1382                }
1383            ]
1384        }
1385
1386        for i, subsys in enumerate(remote_subsystem_list):
1387            sub_port, sub_nqn, sub_addr = map(lambda x: str(x), subsys)
1388            nvme_ctrl = {
1389                "method": "bdev_nvme_attach_controller",
1390                "params": {
1391                    "name": "Nvme{}".format(i),
1392                    "trtype": self.transport,
1393                    "traddr": sub_addr,
1394                    "trsvcid": sub_port,
1395                    "subnqn": sub_nqn,
1396                    "adrfam": "IPv4"
1397                }
1398            }
1399
1400            if self.enable_adq:
1401                nvme_ctrl["params"].update({"priority": "1"})
1402
1403            if self.enable_data_digest:
1404                nvme_ctrl["params"].update({"ddgst": self.enable_data_digest})
1405
1406            bdev_cfg_section["subsystems"][0]["config"].append(nvme_ctrl)
1407
1408        return json.dumps(bdev_cfg_section, indent=2)
1409
1410    def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1, offset=False, offset_inc=0):
1411        filename_section = ""
1412        if len(threads) >= len(subsystems):
1413            threads = range(0, len(subsystems))
1414
1415        # Generate expected NVMe Bdev names and sort them by used NIC numa node
1416        # to allow better grouping when splitting into fio sections.
1417        filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
1418        filename_numas = [self.get_nvme_subsystem_numa(x) for x in filenames]
1419        filenames = [x for _, x in sorted(zip(filename_numas, filenames))]
1420
1421        nvme_per_split = int(len(subsystems) / len(threads))
1422        remainder = len(subsystems) % len(threads)
1423        iterator = iter(filenames)
1424        result = []
1425        for i in range(len(threads)):
1426            result.append([])
1427            for _ in range(nvme_per_split):
1428                result[i].append(next(iterator))
1429            if remainder:
1430                result[i].append(next(iterator))
1431                remainder -= 1
1432        for i, r in enumerate(result):
1433            header = "[filename%s]" % i
1434            disks = "\n".join(["filename=%s" % x for x in r])
1435            job_section_qd = round((io_depth * len(r)) / num_jobs)
1436            if job_section_qd == 0:
1437                job_section_qd = 1
1438            iodepth = "iodepth=%s" % job_section_qd
1439
1440            offset_section = ""
1441            if offset:
1442                offset_section = self.gen_fio_offset_section(offset_inc, num_jobs)
1443
1444            numa_opts = self.gen_fio_numa_section(r)
1445
1446            filename_section = "\n".join([filename_section, header, disks, iodepth, numa_opts, offset_section, ""])
1447
1448        return filename_section
1449
1450    def get_nvme_subsystem_numa(self, bdev_name):
1451        bdev_conf_json_obj = json.loads(self.exec_cmd(["cat", "%s/bdev.conf" % self.spdk_dir]))
1452        bdev_conf_json_obj = bdev_conf_json_obj["subsystems"][0]["config"]
1453
1454        # Remove two last characters to get controller name instead of subsystem name
1455        nvme_ctrl = bdev_name[:-2]
1456        remote_nvme_ip = list(filter(lambda x: x["params"]["name"] == "%s" % nvme_ctrl, bdev_conf_json_obj))[0]["params"]["traddr"]
1457        return self.get_route_nic_numa(remote_nvme_ip)
1458
1459
1460if __name__ == "__main__":
1461    script_full_dir = os.path.dirname(os.path.realpath(__file__))
1462    default_config_file_path = os.path.relpath(os.path.join(script_full_dir, "config.json"))
1463
1464    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
1465    parser.add_argument('-c', '--config', type=str, default=default_config_file_path,
1466                        help='Configuration file.')
1467    parser.add_argument('-r', '--results', type=str, default='/tmp/results',
1468                        help='Results directory.')
1469    parser.add_argument('-s', '--csv-filename', type=str, default='nvmf_results.csv',
1470                        help='CSV results filename.')
1471    parser.add_argument('-f', '--force', default=False, action='store_true',
1472                        dest='force', help="""Force script to continue and try to use all
1473                        available NVMe devices during test.
1474                        WARNING: Might result in data loss on used NVMe drives""")
1475
1476    args = parser.parse_args()
1477
1478    logging.basicConfig(level=logging.INFO,
1479                        format='[%(name)s:%(funcName)s:%(lineno)d] %(message)s')
1480
1481    logging.info("Using config file: %s" % args.config)
1482    with open(args.config, "r") as config:
1483        data = json.load(config)
1484
1485    initiators = []
1486    fio_cases = []
1487
1488    general_config = data["general"]
1489    target_config = data["target"]
1490    initiator_configs = [data[x] for x in data.keys() if "initiator" in x]
1491
1492    if "null_block_devices" not in data["target"] and \
1493        (args.force is False and
1494            "allowlist" not in data["target"] and
1495            "blocklist" not in data["target"]):
1496        # TODO: Also check if allowlist or blocklist are not empty.
1497        logging.warning("""WARNING: This script requires allowlist and blocklist to be defined.
1498        You can choose to use all available NVMe drives on your system, which may potentially
1499        lead to data loss. If you wish to proceed with all attached NVMes, use "-f" option.""")
1500        exit(1)
1501
1502    for k, v in data.items():
1503        if "target" in k:
1504            v.update({"results_dir": args.results})
1505            if data[k]["mode"] == "spdk":
1506                target_obj = SPDKTarget(k, data["general"], v)
1507            elif data[k]["mode"] == "kernel":
1508                target_obj = KernelTarget(k, data["general"], v)
1509        elif "initiator" in k:
1510            if data[k]["mode"] == "spdk":
1511                init_obj = SPDKInitiator(k, data["general"], v)
1512            elif data[k]["mode"] == "kernel":
1513                init_obj = KernelInitiator(k, data["general"], v)
1514            initiators.append(init_obj)
1515        elif "fio" in k:
1516            fio_workloads = itertools.product(data[k]["bs"],
1517                                              data[k]["qd"],
1518                                              data[k]["rw"])
1519
1520            fio_run_time = data[k]["run_time"]
1521            fio_ramp_time = data[k]["ramp_time"]
1522            fio_rw_mix_read = data[k]["rwmixread"]
1523            fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
1524            fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
1525
1526            fio_rate_iops = 0
1527            if "rate_iops" in data[k]:
1528                fio_rate_iops = data[k]["rate_iops"]
1529
1530            fio_offset = False
1531            if "offset" in data[k]:
1532                fio_offset = data[k]["offset"]
1533            fio_offset_inc = 0
1534            if "offset_inc" in data[k]:
1535                fio_offset_inc = data[k]["offset_inc"]
1536        else:
1537            continue
1538
1539    try:
1540        os.mkdir(args.results)
1541    except FileExistsError:
1542        pass
1543
1544    for i in initiators:
1545        target_obj.initiator_info.append(
1546            {"name": i.name, "target_nic_ips": i.target_nic_ips, "initiator_nic_ips": i.nic_ips}
1547        )
1548
1549    # TODO: This try block is definietly too large. Need to break this up into separate
1550    # logical blocks to reduce size.
1551    try:
1552        target_obj.tgt_start()
1553
1554        for i in initiators:
1555            i.match_subsystems(target_obj.subsystem_info_list)
1556            if i.enable_adq:
1557                i.adq_configure_tc()
1558
1559        # Poor mans threading
1560        # Run FIO tests
1561        for block_size, io_depth, rw in fio_workloads:
1562            configs = []
1563            for i in initiators:
1564                i.init_connect()
1565                cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
1566                                       fio_num_jobs, fio_ramp_time, fio_run_time, fio_rate_iops,
1567                                       fio_offset, fio_offset_inc)
1568                configs.append(cfg)
1569
1570            for run_no in range(1, fio_run_num+1):
1571                threads = []
1572                power_daemon = None
1573                measurements_prefix = "%s_%s_%s_m_%s_run_%s" % (block_size, io_depth, rw, fio_rw_mix_read, run_no)
1574
1575                for i, cfg in zip(initiators, configs):
1576                    t = threading.Thread(target=i.run_fio, args=(cfg, run_no))
1577                    threads.append(t)
1578                if target_obj.enable_sar:
1579                    sar_file_prefix = measurements_prefix + "_sar"
1580                    t = threading.Thread(target=target_obj.measure_sar, args=(args.results, sar_file_prefix, fio_ramp_time, fio_run_time))
1581                    threads.append(t)
1582
1583                if target_obj.enable_pcm:
1584                    pcm_fnames = ["%s_%s.csv" % (measurements_prefix, x) for x in ["pcm_cpu", "pcm_memory", "pcm_power"]]
1585
1586                    pcm_cpu_t = threading.Thread(target=target_obj.measure_pcm,
1587                                                 args=(args.results, pcm_fnames[0], fio_ramp_time, fio_run_time))
1588                    pcm_mem_t = threading.Thread(target=target_obj.measure_pcm_memory,
1589                                                 args=(args.results, pcm_fnames[1], fio_ramp_time, fio_run_time))
1590                    pcm_pow_t = threading.Thread(target=target_obj.measure_pcm_power,
1591                                                 args=(args.results, pcm_fnames[2], fio_ramp_time, fio_run_time))
1592
1593                    threads.append(pcm_cpu_t)
1594                    threads.append(pcm_mem_t)
1595                    threads.append(pcm_pow_t)
1596
1597                if target_obj.enable_bw:
1598                    bandwidth_file_name = measurements_prefix + "_bandwidth.csv"
1599                    t = threading.Thread(target=target_obj.measure_network_bandwidth,
1600                                         args=(args.results, bandwidth_file_name, fio_ramp_time, fio_run_time))
1601                    threads.append(t)
1602
1603                if target_obj.enable_dpdk_memory:
1604                    dpdk_mem_file_name = measurements_prefix + "_dpdk_mem.txt"
1605                    t = threading.Thread(target=target_obj.measure_dpdk_memory, args=(args.results, dpdk_mem_file_name, fio_ramp_time))
1606                    threads.append(t)
1607
1608                if target_obj.enable_pm:
1609                    power_daemon = threading.Thread(target=target_obj.measure_power,
1610                                                    args=(args.results, measurements_prefix, script_full_dir,
1611                                                          fio_ramp_time, fio_run_time))
1612                    threads.append(power_daemon)
1613
1614                if target_obj.enable_adq:
1615                    ethtool_thread = threading.Thread(target=target_obj.ethtool_after_fio_ramp, args=(fio_ramp_time,))
1616                    threads.append(ethtool_thread)
1617
1618                for t in threads:
1619                    t.start()
1620                for t in threads:
1621                    t.join()
1622
1623            for i in initiators:
1624                i.init_disconnect()
1625                i.copy_result_files(args.results)
1626        try:
1627            parse_results(args.results, args.csv_filename)
1628        except Exception:
1629            logging.error("There was an error with parsing the results")
1630    finally:
1631        for i in initiators:
1632            try:
1633                i.stop()
1634            except Exception as err:
1635                pass
1636        target_obj.stop()
1637